From 7f6c690a5756801fc04632afa724dd789d827698 Mon Sep 17 00:00:00 2001 From: Ovidiu Valeanu Date: Fri, 5 Apr 2024 19:11:29 +0100 Subject: [PATCH] feat: Modified example to run multiArch Spark jobs in parallel --- schedulers/terraform/argo-workflow/README.md | 2 +- .../terraform/argo-workflow/variables.tf | 2 +- .../sensor-sqs-sparkjobs.yaml | 472 +++++++++--------- .../workflow-examples/sparkapp-graviton.yaml | 49 ++ .../workflow-examples/taxi-trip-execute.sh | 2 +- 5 files changed, 283 insertions(+), 244 deletions(-) create mode 100644 schedulers/terraform/argo-workflow/workflow-examples/sparkapp-graviton.yaml diff --git a/schedulers/terraform/argo-workflow/README.md b/schedulers/terraform/argo-workflow/README.md index b70782fff..4d229576c 100644 --- a/schedulers/terraform/argo-workflow/README.md +++ b/schedulers/terraform/argo-workflow/README.md @@ -78,7 +78,7 @@ Checkout the [documentation website](https://awslabs.github.io/data-on-eks/docs/ | Name | Description | Type | Default | Required | |------|-------------|------|---------|:--------:| -| [eks\_cluster\_version](#input\_eks\_cluster\_version) | EKS Cluster version | `string` | `"1.27"` | no | +| [eks\_cluster\_version](#input\_eks\_cluster\_version) | EKS Cluster version | `string` | `"1.29"` | no | | [eks\_data\_plane\_subnet\_secondary\_cidr](#input\_eks\_data\_plane\_subnet\_secondary\_cidr) | Secondary CIDR blocks. 32766 IPs per Subnet per Subnet/AZ for EKS Node and Pods | `list(string)` |
[
"100.64.0.0/17",
"100.64.128.0/17"
]
| no | | [enable\_amazon\_prometheus](#input\_enable\_amazon\_prometheus) | Enable AWS Managed Prometheus service | `bool` | `true` | no | | [enable\_vpc\_endpoints](#input\_enable\_vpc\_endpoints) | Enable VPC Endpoints | `bool` | `false` | no | diff --git a/schedulers/terraform/argo-workflow/variables.tf b/schedulers/terraform/argo-workflow/variables.tf index 6b8b6d30d..416981f63 100644 --- a/schedulers/terraform/argo-workflow/variables.tf +++ b/schedulers/terraform/argo-workflow/variables.tf @@ -12,7 +12,7 @@ variable "region" { variable "eks_cluster_version" { description = "EKS Cluster version" - default = "1.27" + default = "1.29" type = string } diff --git a/schedulers/terraform/argo-workflow/workflow-examples/sensor-sqs-sparkjobs.yaml b/schedulers/terraform/argo-workflow/workflow-examples/sensor-sqs-sparkjobs.yaml index f80d4e70d..709b828d8 100644 --- a/schedulers/terraform/argo-workflow/workflow-examples/sensor-sqs-sparkjobs.yaml +++ b/schedulers/terraform/argo-workflow/workflow-examples/sensor-sqs-sparkjobs.yaml @@ -3,7 +3,6 @@ # 2/ Replace with your S3 bucket created by this blueprint(Check Terraform outputs) # 3/ execute taxi-trip-execute.sh - apiVersion: argoproj.io/v1alpha1 kind: Sensor metadata: @@ -11,12 +10,7 @@ metadata: namespace: argo-events spec: nodeSelector: - NodeGroupType: SparkComputeOptimized - karpenter.sh/capacity-type: "on-demand" - tolerations: - - key: "spark-compute-optimized" - operator: "Exists" - effect: "NoSchedule" + multiArch: Spark template: serviceAccountName: operate-workflow-sa dependencies: @@ -39,240 +33,236 @@ spec: arguments: {} entrypoint: parallel-jobs nodeSelector: - NodeGroupType: SparkComputeOptimized - karpenter.sh/capacity-type: "on-demand" - tolerations: - - key: "spark-compute-optimized" - operator: "Exists" - effect: "NoSchedule" + multiArch: Spark templates: - - name: parallel-jobs - steps: - - - name: helloworld-job1 - template: whalesay - arguments: - parameters: [{name: message, value: "spark-start!"}] - - - name: spark-operator-pi-job - template: sparkapp-operator-pi - - name: helloworld-job2 - template: whalesay - arguments: - parameters: [{name: message, value: "spark-done!"}] - - - name: spark-operator-taxi-job - template: sparkapp-operator-taxi - - name: whalesay - inputs: - parameters: - - name: message - container: - image: docker/whalesay - command: [cowsay] - args: ["{{inputs.parameters.message}}"] - - name: sparkapp-operator-pi - resource: - action: create - manifest: | - apiVersion: "sparkoperator.k8s.io/v1beta2" - kind: SparkApplication - metadata: - generateName: event-wf-sparkapp-pi-yunikorn- - namespace: spark-team-a - spec: - type: Python - pythonVersion: "3" - mode: cluster - image: "public.ecr.aws/r1l5w1y9/spark-operator:3.2.1-hadoop-3.3.1-java-11-scala-2.12-python-3.8-latest" - imagePullPolicy: Always - mainApplicationFile: "local:///opt/spark/examples/src/main/python/pi.py" - sparkVersion: "3.1.1" - restartPolicy: - type: Never - driver: - cores: 1 - coreLimit: "1200m" - memory: "4g" - memoryOverhead: "4g" - serviceAccount: spark-team-a - nodeSelector: - NodeGroupType: "SparkComputeOptimized" - karpenter.sh/capacity-type: "on-demand" - tolerations: - - key: "spark-compute-optimized" - operator: "Exists" - effect: "NoSchedule" - labels: - version: 3.1.1 - annotations: - yunikorn.apache.org/schedulingPolicyParameters: "placeholderTimeoutSeconds=30 gangSchedulingStyle=Hard" - yunikorn.apache.org/task-group-name: "spark-driver" - yunikorn.apache.org/task-groups: |- - [{ - "name": "spark-driver", - "minMember": 1, - "minResource": { - "cpu": "1200m", - "memory": "14Gi" - }, - "nodeSelector": { - "NodeGroupType": "SparkComputeOptimized", - "karpenter.sh/capacity-type": "on-demand" - }, - "tolerations": [{"key": "spark-compute-optimized", "operator": "Exists", "effect": "NoSchedule"}] - }, - { - "name": "spark-executor", - "minMember": 4, - "minResource": { - "cpu": "1200m", - "memory": "14Gi" - }, - "nodeSelector": { - "NodeGroupType": "SparkComputeOptimized", - "karpenter.sh/capacity-type": "spot" - }, - "tolerations": [{"key": "spark-compute-optimized", "operator": "Exists", "effect": "NoSchedule"}] - }] - executor: - cores: 1 - instances: 4 - memory: "4g" - memoryOverhead: "4g" - serviceAccount: spark-team-a - nodeSelector: - NodeGroupType: "SparkComputeOptimized" - karpenter.sh/capacity-type: "spot" - tolerations: - - key: "spark-compute-optimized" - operator: "Exists" - effect: "NoSchedule" - labels: - version: 3.3.1 - annotations: - yunikorn.apache.org/task-group-name: "spark-executor" - - name: sparkapp-operator-taxi - resource: - action: create - manifest: | - apiVersion: "sparkoperator.k8s.io/v1beta2" - kind: SparkApplication - metadata: - generateName: event-wf-sparkapp-taxi-yunikorn- - namespace: spark-team-a - spec: - type: Python - sparkVersion: "3.2.1" - pythonVersion: "3" - mode: cluster - image: "public.ecr.aws/r1l5w1y9/spark-operator:3.2.1-hadoop-3.3.1-java-11-scala-2.12-python-3.8-latest" - imagePullPolicy: IfNotPresent - mainApplicationFile: "s3a:///taxi-trip/scripts/pyspark-taxi-trip.py" # MainFile is the path to a bundled JAR, Python, or R file of the application - arguments: - - "s3a:///taxi-trip/input/" - - "s3a:///taxi-trip/output/" - hadoopConf: - "fs.s3a.aws.credentials.provider": "com.amazonaws.auth.WebIdentityTokenCredentialsProvider" - "fs.s3a.impl": "org.apache.hadoop.fs.s3a.S3AFileSystem" - "mapreduce.fileoutputcommitter.algorithm.version": "2" - sparkConf: - "spark.local.dir": "/data1" - "spark.speculation": "false" - "spark.network.timeout": "2400" - "spark.hadoop.fs.s3a.connection.timeout": "1200000" - "spark.hadoop.fs.s3a.path.style.access": "true" - "spark.hadoop.fs.s3a.connection.maximum": "200" - "spark.hadoop.fs.s3a.fast.upload": "true" - "spark.hadoop.fs.s3a.readahead.range": "256K" - "spark.hadoop.fs.s3a.input.fadvise": "random" - "spark.hadoop.fs.s3a.impl": "org.apache.hadoop.fs.s3a.S3AFileSystem" - # Spark Event logs - "spark.eventLog.enabled": "true" - "spark.eventLog.dir": "s3a:///spark-event-logs" - "spark.eventLog.rolling.enabled": "true" - "spark.eventLog.rolling.maxFileSize": "64m" - # "spark.history.fs.eventLog.rolling.maxFilesToRetain": 100 - # Expose Spark metrics for Prometheus - "spark.ui.prometheus.enabled": "true" - "spark.executor.processTreeMetrics.enabled": "true" - "spark.kubernetes.driver.annotation.prometheus.io/scrape": "true" - "spark.kubernetes.driver.annotation.prometheus.io/path": "/metrics/executors/prometheus/" - "spark.kubernetes.driver.annotation.prometheus.io/port": "4040" - "spark.kubernetes.driver.service.annotation.prometheus.io/scrape": "true" - "spark.kubernetes.driver.service.annotation.prometheus.io/path": "/metrics/driver/prometheus/" - "spark.kubernetes.driver.service.annotation.prometheus.io/port": "4040" - "spark.metrics.conf.*.sink.prometheusServlet.class": "org.apache.spark.metrics.sink.PrometheusServlet" - "spark.metrics.conf.*.sink.prometheusServlet.path": "/metrics/driver/prometheus/" - "spark.metrics.conf.master.sink.prometheusServlet.path": "/metrics/master/prometheus/" - "spark.metrics.conf.applications.sink.prometheusServlet.path": "/metrics/applications/prometheus/" - restartPolicy: - type: OnFailure - onFailureRetries: 3 - onFailureRetryInterval: 10 - onSubmissionFailureRetries: 5 - onSubmissionFailureRetryInterval: 20 - driver: - cores: 1 - coreLimit: "1200m" - memory: "4g" - memoryOverhead: "4g" - serviceAccount: spark-team-a - nodeSelector: - NodeGroupType: "SparkComputeOptimized" - karpenter.sh/capacity-type: "on-demand" - tolerations: - - key: "spark-compute-optimized" - operator: "Exists" - effect: "NoSchedule" - labels: - version: 3.2.1 - annotations: - yunikorn.apache.org/schedulingPolicyParameters: "placeholderTimeoutSeconds=30 gangSchedulingStyle=Hard" - yunikorn.apache.org/task-group-name: "spark-driver" - # minMember should match with driver and executor instances - # minResource cpu and memory should match with driver and executor cpu and memory - yunikorn.apache.org/task-groups: |- - [{ - "name": "spark-driver", - "minMember": 1, - "minResource": { - "cpu": "1200m", - "memory": "14Gi" - }, - "nodeSelector": { - "NodeGroupType": "SparkComputeOptimized", - "karpenter.sh/capacity-type": "on-demand" - }, - "tolerations": [{"key": "spark-compute-optimized", "operator": "Exists", "effect": "NoSchedule"}] - }, - { - "name": "spark-executor", - "minMember": 4, - "minResource": { - "cpu": "1200m", - "memory": "14Gi" + - name: parallel-jobs + steps: + - - name: helloworld-job1 + template: whalesay + arguments: + parameters: + [{ name: message, value: "spark-start!" }] + - - name: spark-operator-pi-job + template: sparkapp-operator-pi + - name: helloworld-job2 + template: whalesay + arguments: + parameters: + [{ name: message, value: "spark-done!" }] + - - name: spark-operator-taxi-job + template: sparkapp-operator-taxi + - name: whalesay + inputs: + parameters: + - name: message + container: + image: docker/whalesay + command: [cowsay] + args: ["{{inputs.parameters.message}}"] + nodeSelector: + NodeGroupType: "SparkComputeOptimized" + karpenter.sh/capacity-type: "spot" + tolerations: + - key: "spark-compute-optimized" + operator: "Exists" + effect: "NoSchedule" + - name: sparkapp-operator-pi + resource: + action: create + manifest: | + apiVersion: "sparkoperator.k8s.io/v1beta2" + kind: SparkApplication + metadata: + generateName: event-wf-sparkapp-pi-yunikorn- + namespace: spark-team-a + spec: + type: Python + pythonVersion: "3" + mode: cluster + image: "public.ecr.aws/r1l5w1y9/spark-operator:3.2.1-hadoop-3.3.1-java-11-scala-2.12-python-3.8-latest" + imagePullPolicy: Always + mainApplicationFile: "local:///opt/spark/examples/src/main/python/pi.py" + sparkVersion: "3.3.1" + restartPolicy: + type: Never + driver: + cores: 1 + coreLimit: "1200m" + memory: "4g" + memoryOverhead: "4g" + serviceAccount: spark-team-a + nodeSelector: + NodeGroupType: "SparkComputeOptimized" + karpenter.sh/capacity-type: "on-demand" + tolerations: + - key: "spark-compute-optimized" + operator: "Exists" + effect: "NoSchedule" + labels: + version: 3.3.1 + annotations: + yunikorn.apache.org/schedulingPolicyParameters: "placeholderTimeoutSeconds=30 gangSchedulingStyle=Hard" + yunikorn.apache.org/task-group-name: "spark-driver" + yunikorn.apache.org/task-groups: |- + [{ + "name": "spark-driver", + "minMember": 1, + "minResource": { + "cpu": "1200m", + "memory": "14Gi" + }, + "nodeSelector": { + "NodeGroupType": "SparkComputeOptimized", + "karpenter.sh/capacity-type": "on-demand" + }, + "tolerations": [{"key": "spark-compute-optimized", "operator": "Exists", "effect": "NoSchedule"}] }, - "nodeSelector": { - "NodeGroupType": "SparkComputeOptimized", - "karpenter.sh/capacity-type": "spot" + { + "name": "spark-executor", + "minMember": 4, + "minResource": { + "cpu": "1200m", + "memory": "14Gi" + }, + "nodeSelector": { + "NodeGroupType": "SparkComputeOptimized", + "karpenter.sh/capacity-type": "spot" + }, + "tolerations": [{"key": "spark-compute-optimized", "operator": "Exists", "effect": "NoSchedule"}] + }] + executor: + cores: 1 + instances: 4 + memory: "4g" + memoryOverhead: "4g" + serviceAccount: spark-team-a + nodeSelector: + NodeGroupType: "SparkComputeOptimized" + karpenter.sh/capacity-type: "spot" + tolerations: + - key: "spark-compute-optimized" + operator: "Exists" + effect: "NoSchedule" + labels: + version: 3.3.1 + annotations: + yunikorn.apache.org/task-group-name: "spark-executor" + - name: sparkapp-operator-taxi + resource: + action: create + manifest: | + apiVersion: "sparkoperator.k8s.io/v1beta2" + kind: SparkApplication + metadata: + generateName: event-wf-sparkapp-taxi-yunikorn- + namespace: spark-team-a + spec: + type: Python + sparkVersion: "3.3.1" + pythonVersion: "3" + mode: cluster + image: "public.ecr.aws/data-on-eks/spark3.3.1-hadoop3.2-aws-java-sdk-bundle-1.12.647" + imagePullPolicy: IfNotPresent + mainApplicationFile: "s3a:///taxi-trip/scripts/pyspark-taxi-trip.py" # MainFile is the path to a bundled JAR, Python, or R file of the application + arguments: + - "s3a:///taxi-trip/input/" + - "s3a:///taxi-trip/output/" + hadoopConf: + "fs.s3a.aws.credentials.provider": "com.amazonaws.auth.WebIdentityTokenCredentialsProvider" + "fs.s3a.impl": "org.apache.hadoop.fs.s3a.S3AFileSystem" + "mapreduce.fileoutputcommitter.algorithm.version": "2" + sparkConf: + "spark.local.dir": "/data1" + "spark.speculation": "false" + "spark.network.timeout": "2400" + "spark.hadoop.fs.s3a.connection.timeout": "1200000" + "spark.hadoop.fs.s3a.path.style.access": "true" + "spark.hadoop.fs.s3a.connection.maximum": "200" + "spark.hadoop.fs.s3a.fast.upload": "true" + "spark.hadoop.fs.s3a.readahead.range": "256K" + "spark.hadoop.fs.s3a.input.fadvise": "random" + "spark.hadoop.fs.s3a.impl": "org.apache.hadoop.fs.s3a.S3AFileSystem" + # Spark Event logs + "spark.eventLog.enabled": "true" + "spark.eventLog.dir": "s3a:///spark-event-logs" + "spark.eventLog.rolling.enabled": "true" + "spark.eventLog.rolling.maxFileSize": "64m" + # "spark.history.fs.eventLog.rolling.maxFilesToRetain": 100 + # Expose Spark metrics for Prometheus + "spark.ui.prometheus.enabled": "true" + "spark.executor.processTreeMetrics.enabled": "true" + "spark.kubernetes.driver.annotation.prometheus.io/scrape": "true" + "spark.kubernetes.driver.annotation.prometheus.io/path": "/metrics/executors/prometheus/" + "spark.kubernetes.driver.annotation.prometheus.io/port": "4040" + "spark.kubernetes.driver.service.annotation.prometheus.io/scrape": "true" + "spark.kubernetes.driver.service.annotation.prometheus.io/path": "/metrics/driver/prometheus/" + "spark.kubernetes.driver.service.annotation.prometheus.io/port": "4040" + "spark.metrics.conf.*.sink.prometheusServlet.class": "org.apache.spark.metrics.sink.PrometheusServlet" + "spark.metrics.conf.*.sink.prometheusServlet.path": "/metrics/driver/prometheus/" + "spark.metrics.conf.master.sink.prometheusServlet.path": "/metrics/master/prometheus/" + "spark.metrics.conf.applications.sink.prometheusServlet.path": "/metrics/applications/prometheus/" + restartPolicy: + type: OnFailure + onFailureRetries: 3 + onFailureRetryInterval: 10 + onSubmissionFailureRetries: 5 + onSubmissionFailureRetryInterval: 20 + driver: + cores: 1 + coreLimit: "1200m" + memory: "4g" + memoryOverhead: "4g" + serviceAccount: spark-team-a + nodeSelector: + NodeGroupType: "SparkGravitonComputeOptimized" + karpenter.sh/capacity-type: "on-demand" + tolerations: + - key: "spark-graviton-compute-optimized" + operator: "Exists" + effect: "NoSchedule" + labels: + version: 3.3.1 + annotations: + yunikorn.apache.org/schedulingPolicyParameters: "placeholderTimeoutSeconds=30 gangSchedulingStyle=Hard" + yunikorn.apache.org/task-group-name: "spark-driver" + # minMember should match with driver and executor instances + # minResource cpu and memory should match with driver and executor cpu and memory + yunikorn.apache.org/task-groups: |- + [{ + "name": "spark-driver", + "minMember": 1, + "nodeSelector": { + "NodeGroupType": "SparkGravitonComputeOptimized", + "karpenter.sh/capacity-type": "on-demand" + }, + "tolerations": [{"key": "spark-graviton-compute-optimized", "operator": "Exists", "effect": "NoSchedule"}] }, - "tolerations": [{"key": "spark-compute-optimized", "operator": "Exists", "effect": "NoSchedule"}] - }] - executor: - podSecurityContext: - fsGroup: 185 - cores: 1 - coreLimit: "1200m" - instances: 4 - memory: "4g" - memoryOverhead: "4g" - serviceAccount: spark-team-a - labels: - version: 3.2.1 - annotations: - yunikorn.apache.org/task-group-name: "spark-executor" - nodeSelector: - NodeGroupType: "SparkComputeOptimized" - karpenter.sh/capacity-type: "spot" - tolerations: - - key: "spark-compute-optimized" - operator: "Exists" - effect: "NoSchedule" + { + "name": "spark-executor", + "minMember": 4, + "nodeSelector": { + "NodeGroupType": "SparkGravitonComputeOptimized", + "karpenter.sh/capacity-type": "spot" + }, + "tolerations": [{"key": "spark-graviton-compute-optimized", "operator": "Exists", "effect": "NoSchedule"}] + }] + executor: + podSecurityContext: + fsGroup: 185 + cores: 1 + coreLimit: "1200m" + instances: 4 + memory: "4g" + memoryOverhead: "4g" + serviceAccount: spark-team-a + labels: + version: 3.3.1 + annotations: + yunikorn.apache.org/task-group-name: "spark-executor" + nodeSelector: + NodeGroupType: "SparkGravitonComputeOptimized" + karpenter.sh/capacity-type: "spot" + tolerations: + - key: "spark-graviton-compute-optimized" + operator: "Exists" + effect: "NoSchedule" diff --git a/schedulers/terraform/argo-workflow/workflow-examples/sparkapp-graviton.yaml b/schedulers/terraform/argo-workflow/workflow-examples/sparkapp-graviton.yaml new file mode 100644 index 000000000..14d3dad80 --- /dev/null +++ b/schedulers/terraform/argo-workflow/workflow-examples/sparkapp-graviton.yaml @@ -0,0 +1,49 @@ +apiVersion: "sparkoperator.k8s.io/v1beta2" +kind: SparkApplication +metadata: + name: pyspark-pi-karpenter-compute + namespace: spark-team-a +spec: + type: Python + pythonVersion: "3" + mode: cluster + image: "public.ecr.aws/data-on-eks/spark3.3.1-hadoop3.2-aws-java-sdk-bundle-1.12.647" + imagePullPolicy: Always + mainApplicationFile: local:///opt/spark/examples/src/main/python/pi.py + sparkVersion: "3.3.1" + restartPolicy: + type: OnFailure + onFailureRetries: 1 + onFailureRetryInterval: 10 + onSubmissionFailureRetries: 5 + onSubmissionFailureRetryInterval: 20 + driver: + cores: 1 + coreLimit: "1200m" + memory: "4g" + labels: + version: 3.3.1 + serviceAccount: spark-team-a + # Using Karpenter provisioner nodeSelectors and tolerations + nodeSelector: + NodeGroupType: SparkGravitonComputeOptimized + karpenter.sh/capacity-type: "on-demand" + tolerations: + - key: "spark-graviton-compute-optimized" + operator: "Exists" + effect: "NoSchedule" + executor: + cores: 1 + instances: 4 + memory: "4g" + serviceAccount: spark-team-a + labels: + version: 3.3.1 + # Using Karpenter provisioner nodeSelectors and tolerations + nodeSelector: + NodeGroupType: SparkGravitonComputeOptimized + karpenter.sh/capacity-type: "spot" + tolerations: + - key: "spark-graviton-compute-optimized" + operator: "Exists" + effect: "NoSchedule" diff --git a/schedulers/terraform/argo-workflow/workflow-examples/taxi-trip-execute.sh b/schedulers/terraform/argo-workflow/workflow-examples/taxi-trip-execute.sh index a281fbc67..f113d7b8e 100755 --- a/schedulers/terraform/argo-workflow/workflow-examples/taxi-trip-execute.sh +++ b/schedulers/terraform/argo-workflow/workflow-examples/taxi-trip-execute.sh @@ -31,7 +31,7 @@ aws s3 cp pyspark-taxi-trip.py s3://${s3_bucket}/taxi-trip/scripts/ --region ${r wget https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2022-01.parquet -O "input/yellow_tripdata_2022-0.parquet" # Making duplicate copies to increase the size of the data. -max=100 +max=5 for (( i=1; i <= $max; ++i )) do cp -rf "input/yellow_tripdata_2022-0.parquet" "input/yellow_tripdata_2022-${i}.parquet"