Skip to content

Commit

Permalink
Refactor usability tests.
Browse files Browse the repository at this point in the history
>
>
Co-authored-by: Elisée TEGUE <[email protected]>
Co-authored-by: Mic E <[email protected]>
  • Loading branch information
el-tegy committed Nov 26, 2023
1 parent 328572d commit 0d3b914
Showing 1 changed file with 26 additions and 1 deletion.
27 changes: 26 additions & 1 deletion stream_persona/src/spark_streaming.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,11 @@
from pyspark.sql.types import StructType,StructField,FloatType,IntegerType,StringType
from pyspark.sql.functions import from_json,col

logging.basicConfig(level=logging.INFO,
format='%(asctime)s:%(funcName)s:%(levelname)s:%(message)s')
logger = logging.getLogger("spark_structured_streaming")


def create_spark_session():
"""
Creates the Spark Session with suitable configs.
Expand All @@ -23,4 +28,24 @@ def create_spark_session():
except Exception:
logging.error("Couldn't create the spark session")

return spark
return spark

def create_initial_dataframe(spark_session):
"""
Reads the streaming data and creates the initial dataframe accordingly.
"""
try:
# Gets the streaming data from topic random_names
df = spark_session \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "kafka1:19092,kafka2:19093,kafka3:19094") \
.option("subscribe", "random_names") \
.option("delimeter",",") \
.option("startingOffsets", "earliest") \
.load()
logging.info("Initial dataframe created successfully")
except Exception as e:
logging.warning(f"Initial dataframe couldn't be created due to exception: {e}")

return df

0 comments on commit 0d3b914

Please sign in to comment.