Skip to content

Commit

Permalink
feat: Spark Streaming using Spark Operator (awslabs#516)
Browse files Browse the repository at this point in the history
  • Loading branch information
lusoal authored May 16, 2024
1 parent e1fb650 commit 6fa0ca6
Show file tree
Hide file tree
Showing 38 changed files with 3,236 additions and 0 deletions.
49 changes: 49 additions & 0 deletions streaming/spark-streaming/examples/consumer/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
# Dockerfile for Apache Spark with additional JARs downloaded at build time
FROM apache/spark-py:v3.3.2
WORKDIR /app

# Use root to create a new user and configure permissions
USER root

# Install wget to download JAR files
RUN apt-get update && apt-get install -y wget && \
rm -rf /var/lib/apt/lists/*

# Create a new user 'spark-user' with UID 1001
RUN groupadd -r spark-group && useradd -r -u 1001 -g spark-group spark-user

RUN mkdir -p /home/spark-user/.ivy2/cache
RUN mkdir -p /app/jars

RUN cd /app/jars && \
wget -q "https://repo1.maven.org/maven2/com/amazonaws/aws-java-sdk-bundle/1.11.1026/aws-java-sdk-bundle-1.11.1026.jar" && \
wget -q "https://repo1.maven.org/maven2/commons-logging/commons-logging/1.1.3/commons-logging-1.1.3.jar" && \
wget -q "https://repo1.maven.org/maven2/org/apache/commons/commons-pool2/2.11.1/commons-pool2-2.11.1.jar" && \
wget -q "https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-aws/3.3.2/hadoop-aws-3.3.2.jar" && \
wget -q "https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-client-api/3.3.2/hadoop-client-api-3.3.2.jar" && \
wget -q "https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-client-runtime/3.3.2/hadoop-client-runtime-3.3.2.jar" && \
wget -q "https://repo1.maven.org/maven2/org/apache/iceberg/iceberg-spark-runtime-3.3_2.12/1.0.0/iceberg-spark-runtime-3.3_2.12-1.0.0.jar" && \
wget -q "https://repo1.maven.org/maven2/com/google/code/findbugs/jsr305/3.0.0/jsr305-3.0.0.jar" && \
wget -q "https://repo1.maven.org/maven2/org/apache/kafka/kafka-clients/2.8.1/kafka-clients-2.8.1.jar" && \
wget -q "https://repo1.maven.org/maven2/org/lz4/lz4-java/1.7.1/lz4-java-1.7.1.jar" && \
wget -q "https://repo1.maven.org/maven2/org/apache/parquet/parquet-avro/1.12.3/parquet-avro-1.12.3.jar" && \
wget -q "https://repo1.maven.org/maven2/org/scala-lang/scala-library/2.12.15/scala-library-2.12.15.jar" && \
wget -q "https://repo1.maven.org/maven2/org/slf4j/slf4j-api/1.7.30/slf4j-api-1.7.30.jar" && \
wget -q "https://repo1.maven.org/maven2/org/xerial/snappy/snappy-java/1.1.8.1/snappy-java-1.1.8.1.jar" && \
wget -q "https://repo1.maven.org/maven2/org/apache/spark/spark-sql-kafka-0-10_2.12/3.3.2/spark-sql-kafka-0-10_2.12-3.3.2.jar" && \
wget -q "https://repo1.maven.org/maven2/org/apache/spark/spark-tags_2.12/3.3.2/spark-tags_2.12-3.3.2.jar" && \
wget -q "https://repo1.maven.org/maven2/org/apache/spark/spark-token-provider-kafka-0-10_2.12/3.3.2/spark-token-provider-kafka-0-10_2.12-3.3.2.jar" && \
wget -q "https://repo1.maven.org/maven2/org/wildfly/openssl/wildfly-openssl/1.0.7.Final/wildfly-openssl-1.0.7.Final.jar"

# Set the owner of the Ivy cache directory and /app to the new user
RUN chown -R spark-user:spark-group /home/spark-user/
RUN chown -R spark-user:spark-group /app/

# Switch to the new user for running the application
USER spark-user

# Add the Spark application script to the container
ADD app.py /app

# Set the entry point for the container
ENTRYPOINT ["/opt/entrypoint.sh"]
81 changes: 81 additions & 0 deletions streaming/spark-streaming/examples/consumer/app.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType
from pyspark.sql.functions import from_json
import os

# Variables
s3_bucket_name = os.getenv("S3_BUCKET_NAME", "my-iceberg-data-bucket")
kafka_address = os.getenv("KAFKA_ADDRESS", 'b-1.kafkademospark.mkjcj4.c12.kafka.us-west-2.amazonaws.com:9092,b-2.kafkademospark.mkjcj4.c12.kafka.us-west-2.amazonaws.com:9092')

def create_spark_session():
spark = SparkSession.builder \
.appName("KafkaToIceberg") \
.config("spark.jars.packages", "org.apache.iceberg:iceberg-spark-runtime-3.3_2.12:1.0.0,org.apache.hadoop:hadoop-aws:3.3.2,org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.2") \
.config("spark.jars.repositories", "https://repo1.maven.org/maven2/") \
.config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \
.config("spark.sql.catalog.local", "org.apache.iceberg.spark.SparkCatalog") \
.config("spark.sql.catalog.local.type", "hadoop") \
.config("spark.sql.catalog.local.warehouse", f"s3a://{s3_bucket_name}/iceberg/warehouse/") \
.config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
.config("spark.hadoop.fs.s3a.aws.credentials.provider", "com.amazonaws.auth.DefaultAWSCredentialsProviderChain") \
.config("spark.sql.warehouse.dir", f"s3a://{s3_bucket_name}/iceberg/warehouse/") \
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \
.config("spark.kryo.registrationRequired", "false") \
.getOrCreate()
return spark

def consume_and_write():
spark = create_spark_session()
# Debug spark DEBUG
spark.sparkContext.setLogLevel("ERROR")
# Create the table
spark.sql(f"""
CREATE TABLE IF NOT EXISTS local.my_table (
id STRING,
timestamp STRING,
alert_type STRING,
severity STRING,
description STRING
)
USING iceberg
LOCATION 's3a://{s3_bucket_name}/iceberg/warehouse/my_table'
TBLPROPERTIES (
'write.format.default'='parquet' -- Explicitly specifying Parquet format
)
""")

# Read from Kafka
df = spark.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", kafka_address) \
.option("subscribe", "security-topic") \
.option("startingOffsets", "earliest") \
.option("failOnDataLoss", "false") \
.load()

# Define the schema for the JSON data
json_schema = StructType([
StructField("id", StringType(), True),
StructField("timestamp", StringType(), True),
StructField("alert_type", StringType(), True),
StructField("severity", StringType(), True),
StructField("description", StringType(), True)
])

# Parse JSON and select the required columns
parsed_df = df.selectExpr("CAST(value AS STRING) as json") \
.select(from_json("json", json_schema).alias("data")) \
.select("data.id", "data.timestamp", "data.alert_type", "data.severity", "data.description")

# Write the stream to Iceberg using table name
query = parsed_df.writeStream \
.format("iceberg") \
.option("checkpointLocation", f"s3a://{s3_bucket_name}/iceberg/checkpoints/") \
.option("path", f"s3a://{s3_bucket_name}/iceberg/warehouse/my_table") \
.outputMode("append") \
.start()

query.awaitTermination() # Wait for the stream to finish

if __name__ == "__main__":
consume_and_write()
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
# This script is used to run the spark-consumer application on EKS,
# users need to replace MY_BUCKET_NAME and MY_KAFKA_BROKERS_ADRESS to match your environment.
---
apiVersion: "sparkoperator.k8s.io/v1beta2"
kind: SparkApplication
metadata:
name: spark-consumer
namespace: spark-team-a
spec:
type: Python
pythonVersion: "3"
mode: cluster
image: "public.ecr.aws/data-on-eks/consumer-spark-streaming-3.3.2-kafka:1" # You can build your own image using the Dockerfile in this folder
mainApplicationFile: "local:///app/app.py"
sparkVersion: "3.3.2"
deps:
jars:
- "local:///app/jars/commons-logging-1.1.3.jar"
- "local:///app/jars/commons-pool2-2.11.1.jar"
- "local:///app/jars/hadoop-client-api-3.3.2.jar"
- "local:///app/jars/hadoop-client-runtime-3.3.2.jar"
- "local:///app/jars/jsr305-3.0.0.jar"
- "local:///app/jars/kafka-clients-2.8.1.jar"
- "local:///app/jars/lz4-java-1.7.1.jar"
- "local:///app/jars/scala-library-2.12.15.jar"
- "local:///app/jars/slf4j-api-1.7.30.jar"
- "local:///app/jars/snappy-java-1.1.8.1.jar"
- "local:///app/jars/spark-sql-kafka-0-10_2.12-3.3.2.jar"
- "local:///app/jars/spark-tags_2.12-3.3.2.jar"
- "local:///app/jars/spark-token-provider-kafka-0-10_2.12-3.3.2.jar"
- "local:///app/jars/iceberg-spark-runtime-3.3_2.12-1.0.0.jar"
- "local:///app/jars/hadoop-aws-3.3.2.jar"
- "local:///app/jars/aws-java-sdk-bundle-1.11.1026.jar"
- "local:///app/jars/wildfly-openssl-1.0.7.Final.jar"
- "local:///app/jars/parquet-avro-1.12.3.jar"
sparkConf:
"spark.app.name": "KafkaToIceberg"
"spark.jars.repositories": "https://repo1.maven.org/maven2/"
"spark.sql.extensions": "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions"
"spark.sql.catalog.local": "org.apache.iceberg.spark.SparkCatalog"
"spark.sql.catalog.local.type": "hadoop"
"spark.sql.catalog.local.warehouse": "s3a://__MY_BUCKET_NAME__/iceberg/warehouse/" # Replace bucket name with your S3 bucket name: s3_bucket_id_iceberg_bucket
"spark.hadoop.fs.s3a.impl": "org.apache.hadoop.fs.s3a.S3AFileSystem"
"spark.hadoop.fs.s3a.aws.credentials.provider": "com.amazonaws.auth.DefaultAWSCredentialsProviderChain"
"spark.sql.warehouse.dir": "s3a://__MY_BUCKET_NAME__/iceberg/warehouse/" # Replace bucket name with your S3 bucket name: s3_bucket_id_iceberg_bucket
"spark.metrics.conf.*.sink.prometheusServlet.class": "org.apache.spark.metrics.sink.PrometheusServlet"
"spark.metrics.conf.*.sink.prometheusServlet.path": "/metrics"
"spark.metrics.conf.master.sink.prometheusServlet.path": "/metrics/master"
"spark.metrics.conf.applications.sink.prometheusServlet.path": "/metrics/applications"
"spark.ui.prometheus.enabled": "true"
"spark.ui.prometheus.port": "4040"
restartPolicy:
type: OnFailure
onFailureRetries: 2
onFailureRetryInterval: 10
onSubmissionFailureRetries: 3
onSubmissionFailureRetryInterval: 20
dynamicAllocation:
enabled: true
initialExecutors: 3
minExecutors: 3
maxExecutors: 10
driver:
cores: 1
coreLimit: "1200m"
memory: "1024m"
labels:
version: "3.3.2"
app: spark
annotations:
prometheus.io/scrape: 'true'
prometheus.io/path: /metrics
prometheus.io/port: '4040'
serviceAccount: spark-team-a
nodeSelector:
NodeGroupType: "SparkComputeOptimized"
tolerations:
- key: "spark-compute-optimized"
operator: "Exists"
effect: "NoSchedule"
env:
- name: S3_BUCKET_NAME
value: "__MY_BUCKET_NAME__" # Replace with your S3 bucket name: s3_bucket_id_iceberg_bucket
- name: KAFKA_ADDRESS
value: "__MY_KAFKA_BROKERS_ADRESS__" # Replace with your Kafka brokers address: bootstrap_brokers
# value: "b-1.kafkademospark.mkjcj4.c12.kafka.us-west-2.amazonaws.com:9092,b-2.kafkademospark.mkjcj4.c12.kafka.us-west-2.amazonaws.com:9092"
executor:
cores: 2
memory: "1024m"
labels:
version: "3.3.2"
app: spark
annotations:
prometheus.io/scrape: 'true'
prometheus.io/path: /metrics
prometheus.io/port: '4040'
serviceAccount: spark-team-a
nodeSelector:
NodeGroupType: "SparkComputeOptimized"
tolerations:
- key: "spark-compute-optimized"
operator: "Exists"
effect: "NoSchedule"
env:
- name: S3_BUCKET_NAME
value: "__MY_BUCKET_NAME__" # Replace with your S3 bucket name: s3_bucket_id_iceberg_bucket
- name: KAFKA_ADDRESS
value: "__MY_KAFKA_BROKERS_ADRESS__" # Replace with your Kafka brokers address: bootstrap_brokers
# value: "b-1.kafkademospark.mkjcj4.c12.kafka.us-west-2.amazonaws.com:9092,b-2.kafkademospark.mkjcj4.c12.kafka.us-west-2.amazonaws.com:9092"
2 changes: 2 additions & 0 deletions streaming/spark-streaming/examples/consumer/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
py4j==0.10.9.5
pyspark==3.3.2
28 changes: 28 additions & 0 deletions streaming/spark-streaming/examples/docker-compose.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
version: '3'
services:
zookeeper:
image: confluentinc/cp-zookeeper:latest
container_name: zookeeper
ports:
- "2181:2181"
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000

kafka:
image: confluentinc/cp-kafka:latest
container_name: kafka
depends_on:
- zookeeper
ports:
- "9092:9092"
- "9093:9093" # Added a new port for the external listener
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092,PLAINTEXT_HOST://0.0.0.0:9093 # Changed the PLAINTEXT_HOST to a different port
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:9093 # Changed the PLAINTEXT_HOST to advertise the new port
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
52 changes: 52 additions & 0 deletions streaming/spark-streaming/examples/producer/00_deployment.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
---
# This is the producer deployment file, you can adjust the number of replicas to produce more data.
# You will need to change __MY_AWS_REGION__, __MY_KAFKA_BROKERS__, and __MY_PRODUCER_ROLE_ARN__ to match your environment.
apiVersion: v1
kind: ServiceAccount
metadata:
name: producer-sa
annotations:
eks.amazonaws.com/role-arn: __MY_PRODUCER_ROLE_ARN__ # Replace with your producer role ARN: producer_iam_role_arn
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: producer-deployment
spec:
replicas: 100 # Adjusted to match the required number of replicas
selector:
matchLabels:
app: producer
template:
metadata:
labels:
app: producer
spec:
serviceAccountName: producer-sa
containers:
- name: producer
image: public.ecr.aws/data-on-eks/producer-kafka:1
#image: public.ecr.aws/data-on-eks/producer-kafka:1
command: ["python", "app.py"]
env:
- name: RATE_PER_SECOND
value: "100000"
- name: NUM_OF_MESSAGES
value: "10000000"
- name: AWS_REGION
value: "__MY_AWS_REGION__" # Replace with your AWS region
- name: BOOTSTRAP_BROKERS
value: "__MY_KAFKA_BROKERS__" # Replace with your bootstrap brokers: bootstrap_brokers
resources:
limits:
cpu: "2" # Increased CPU limit
memory: "4Gi" # Increased memory limit
requests:
cpu: "1" # Increased CPU request
memory: "2Gi" # Increased memory request
volumeMounts:
- name: shared-volume
mountPath: /mnt
volumes:
- name: shared-volume
emptyDir: {}
50 changes: 50 additions & 0 deletions streaming/spark-streaming/examples/producer/01_delete_topic.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
apiVersion: v1
kind: ConfigMap
metadata:
name: kafka-delete-topic-script
namespace: default
data:
delete_topic.py: |
from kafka.admin import KafkaAdminClient
def delete_topic(bootstrap_servers, topic_name):
"""Delete a Kafka topic."""
client = KafkaAdminClient(bootstrap_servers=bootstrap_servers)
try:
client.delete_topics([topic_name])
print(f"Topic {topic_name} deleted successfully.")
except Exception as e:
print(f"Failed to delete topic {topic_name}: {e}")
# Configuration
import os
bootstrap_servers = os.getenv('BOOTSTRAP_BROKERS', 'localhost:9092') # Replace with your Kafka broker address
topic_name = os.getenv('TOPIC_NAME', 'security-topic') # Replace with your topic name
# Delete Kafka topic
delete_topic(bootstrap_servers, topic_name)
---
apiVersion: v1
kind: Pod
metadata:
name: kafka-delete-topic-pod
namespace: default
spec:
containers:
- name: delete-topic
image: public.ecr.aws/data-on-eks/producer-kafka:1 # Use an appropriate Python image
command: ["python", "/scripts/delete_topic.py"]
env:
- name: BOOTSTRAP_BROKERS
value: "__MY_KAFKA_BROKERS__" # Replace with your Kafka broker address
- name: TOPIC_NAME
value: "security-topic" # Replace with your topic name
volumeMounts:
- name: script-volume
mountPath: /scripts
restartPolicy: Never
volumes:
- name: script-volume
configMap:
name: kafka-delete-topic-script
14 changes: 14 additions & 0 deletions streaming/spark-streaming/examples/producer/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
# Use an official Python runtime as a parent image
FROM python:3.9-slim

# Set the working directory in the container
WORKDIR /usr/src/app

# Copy the local code to the container
COPY . .

# Install any needed packages specified in requirements.txt
RUN pip install --no-cache-dir -r requirements.txt

# Run app.py when the container launches
CMD ["python", "app.py"]
Loading

0 comments on commit 6fa0ca6

Please sign in to comment.