Skip to content

Commit

Permalink
Merge pull request #22 from el-tegy/spark_streaming
Browse files Browse the repository at this point in the history
Spark streaming
  • Loading branch information
el-tegy authored Nov 26, 2023
2 parents 32ddb5e + 37b920a commit cd01590
Showing 1 changed file with 31 additions and 0 deletions.
31 changes: 31 additions & 0 deletions stream_persona/src/spark_streaming.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
import logging
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType,StructField,FloatType,IntegerType,StringType
from pyspark.sql.functions import from_json,col

logging.basicConfig(level=logging.INFO,
format='%(asctime)s:%(funcName)s:%(levelname)s:%(message)s')
logger = logging.getLogger("spark_structured_streaming")


def create_spark_session():
"""
Creates the Spark Session with suitable configs.
"""
try:
# Spark session is established with cassandra and kafka jars. Suitable versions can be found in Maven repository.
spark = SparkSession \
.builder \
.appName("SparkStructuredStreaming") \
.config("spark.jars.packages", "com.datastax.spark:spark-cassandra-connector_2.12:3.0.0,org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.0") \
.config("spark.cassandra.connection.host", "cassandra") \
.config("spark.cassandra.connection.port","9042")\
.config("spark.cassandra.auth.username", "cassandra") \
.config("spark.cassandra.auth.password", "cassandra") \
.getOrCreate()
spark.sparkContext.setLogLevel("ERROR")
logging.info('Spark session created successfully')
except Exception:
logging.error("Couldn't create the spark session")

return spark

0 comments on commit cd01590

Please sign in to comment.