diff --git a/README.md b/README.md old mode 100755 new mode 100644 index e4b06c80b..3c5449bc3 --- a/README.md +++ b/README.md @@ -74,4 +74,4 @@ This library is licensed under the Apache 2.0 License. ## 🙌 Community We welcome all individuals who are enthusiastic about data on Kubernetes to become a part of this open source community. Your contributions and participation are invaluable to the success of this project. -Built with ❤️ at AWS. +Built with ❤️ at AWS. \ No newline at end of file diff --git a/streaming/emr-eks-flink/README.md b/streaming/emr-eks-flink/README.md new file mode 100644 index 000000000..52710bdc0 --- /dev/null +++ b/streaming/emr-eks-flink/README.md @@ -0,0 +1,140 @@ +# EMR-EKS-Flink Blueprint + +Checkout the [documentation website](https://awslabs.github.io/data-on-eks/docs/blueprints/streaming/emr-eks-flink) to deploy this pattern and run sample tests. + + +## Requirements + +| Name | Version | +|------|---------| +| [terraform](#requirement\_terraform) | >= 1.0.0 | +| [aws](#requirement\_aws) | >= 3.72 | +| [helm](#requirement\_helm) | >= 2.13.0 | +| [kubectl](#requirement\_kubectl) | >= 1.14 | +| [kubernetes](#requirement\_kubernetes) | >= 2.10 | +| [random](#requirement\_random) | 3.3.2 | + +## Providers + +| Name | Version | +|------|---------| +| [aws](#provider\_aws) | >= 3.72 | +| [aws.ecr](#provider\_aws.ecr) | >= 3.72 | +| [kubernetes](#provider\_kubernetes) | >= 2.10 | + +## Modules + +| Name | Source | Version | +|------|--------|---------| +| [ebs\_csi\_driver\_irsa](#module\_ebs\_csi\_driver\_irsa) | terraform-aws-modules/iam/aws//modules/iam-role-for-service-accounts-eks | ~> 5.20 | +| [eks](#module\_eks) | terraform-aws-modules/eks/aws | ~> 19.15 | +| [eks\_blueprints\_addons](#module\_eks\_blueprints\_addons) | aws-ia/eks-blueprints-addons/aws | ~> 1.2 | +| [eks\_data\_addons](#module\_eks\_data\_addons) | aws-ia/eks-data-addons/aws | ~> 1.30 | +| [flink\_irsa\_jobs](#module\_flink\_irsa\_jobs) | aws-ia/eks-blueprints-addon/aws | ~> 1.0 | +| [flink\_irsa\_operator](#module\_flink\_irsa\_operator) | aws-ia/eks-blueprints-addon/aws | ~> 1.0 | +| [s3\_bucket](#module\_s3\_bucket) | terraform-aws-modules/s3-bucket/aws | ~> 3.0 | +| [vpc](#module\_vpc) | terraform-aws-modules/vpc/aws | 5.5.1 | + +## Resources + +| Name | Type | +|------|------| +| [aws_cloudwatch_log_group.flink_team_a](https://registry.terraform.io/providers/hashicorp/aws/latest/docs/resources/cloudwatch_log_group) | resource | +| [aws_iam_policy.flink](https://registry.terraform.io/providers/hashicorp/aws/latest/docs/resources/iam_policy) | resource | +| [aws_s3_object.checkpoints](https://registry.terraform.io/providers/hashicorp/aws/latest/docs/resources/s3_object) | resource | +| [aws_s3_object.jobmanager](https://registry.terraform.io/providers/hashicorp/aws/latest/docs/resources/s3_object) | resource | +| [aws_s3_object.logs](https://registry.terraform.io/providers/hashicorp/aws/latest/docs/resources/s3_object) | resource | +| [aws_s3_object.savepoints](https://registry.terraform.io/providers/hashicorp/aws/latest/docs/resources/s3_object) | resource | +| [kubernetes_namespace_v1.flink_team_a](https://registry.terraform.io/providers/hashicorp/kubernetes/latest/docs/resources/namespace_v1) | resource | +| [aws_ami.x86](https://registry.terraform.io/providers/hashicorp/aws/latest/docs/data-sources/ami) | data source | +| [aws_availability_zones.available](https://registry.terraform.io/providers/hashicorp/aws/latest/docs/data-sources/availability_zones) | data source | +| [aws_caller_identity.current](https://registry.terraform.io/providers/hashicorp/aws/latest/docs/data-sources/caller_identity) | data source | +| [aws_ecrpublic_authorization_token.token](https://registry.terraform.io/providers/hashicorp/aws/latest/docs/data-sources/ecrpublic_authorization_token) | data source | +| [aws_eks_cluster_auth.this](https://registry.terraform.io/providers/hashicorp/aws/latest/docs/data-sources/eks_cluster_auth) | data source | +| [aws_iam_policy_document.flink_sample_job](https://registry.terraform.io/providers/hashicorp/aws/latest/docs/data-sources/iam_policy_document) | data source | + +## Inputs + +| Name | Description | Type | Default | Required | +|------|-------------|------|---------|:--------:| +| [eks\_cluster\_version](#input\_eks\_cluster\_version) | EKS version for the cluster | `string` | `"1.28"` | no | +| [name](#input\_name) | Name of the VPC and EKS Cluster | `string` | `"emr-eks-flink"` | no | +| [region](#input\_region) | Region for deployment | `string` | `"us-west-2"` | no | + +## Outputs + +| Name | Description | +|------|-------------| +| [configure\_kubectl](#output\_configure\_kubectl) | Configure kubectl: make sure you're logged in with the correct AWS profile and run the following command to update your kubeconfig | +| [flink\_job\_execution\_role\_arn](#output\_flink\_job\_execution\_role\_arn) | IAM linked role for the flink job | +| [flink\_operator\_bucket](#output\_flink\_operator\_bucket) | S3 bucket name for Flink operator data,logs,checkpoint and savepoint | +| [flink\_operator\_role\_arn](#output\_flink\_operator\_role\_arn) | IAM linked role for the flink operator | + + +## Requirements + +| Name | Version | +|------|---------| +| [terraform](#requirement\_terraform) | >= 1.0.0 | +| [aws](#requirement\_aws) | >= 3.72 | +| [helm](#requirement\_helm) | >= 2.13.0 | +| [kubectl](#requirement\_kubectl) | >= 1.14 | +| [kubernetes](#requirement\_kubernetes) | >= 2.10 | +| [random](#requirement\_random) | 3.3.2 | + +## Providers + +| Name | Version | +|------|---------| +| [aws](#provider\_aws) | 5.46.0 | +| [aws.ecr](#provider\_aws.ecr) | 5.46.0 | +| [kubernetes](#provider\_kubernetes) | 2.29.0 | + +## Modules + +| Name | Source | Version | +|------|--------|---------| +| [ebs\_csi\_driver\_irsa](#module\_ebs\_csi\_driver\_irsa) | terraform-aws-modules/iam/aws//modules/iam-role-for-service-accounts-eks | ~> 5.20 | +| [eks](#module\_eks) | terraform-aws-modules/eks/aws | ~> 19.15 | +| [eks\_blueprints\_addons](#module\_eks\_blueprints\_addons) | aws-ia/eks-blueprints-addons/aws | ~> 1.2 | +| [eks\_data\_addons](#module\_eks\_data\_addons) | aws-ia/eks-data-addons/aws | ~> 1.30 | +| [flink\_irsa\_jobs](#module\_flink\_irsa\_jobs) | aws-ia/eks-blueprints-addon/aws | ~> 1.0 | +| [flink\_irsa\_operator](#module\_flink\_irsa\_operator) | aws-ia/eks-blueprints-addon/aws | ~> 1.0 | +| [s3\_bucket](#module\_s3\_bucket) | terraform-aws-modules/s3-bucket/aws | ~> 3.0 | +| [vpc](#module\_vpc) | terraform-aws-modules/vpc/aws | 5.5.1 | + +## Resources + +| Name | Type | +|------|------| +| [aws_cloudwatch_log_group.flink_team_a](https://registry.terraform.io/providers/hashicorp/aws/latest/docs/resources/cloudwatch_log_group) | resource | +| [aws_iam_policy.flink](https://registry.terraform.io/providers/hashicorp/aws/latest/docs/resources/iam_policy) | resource | +| [aws_s3_object.checkpoints](https://registry.terraform.io/providers/hashicorp/aws/latest/docs/resources/s3_object) | resource | +| [aws_s3_object.jobmanager](https://registry.terraform.io/providers/hashicorp/aws/latest/docs/resources/s3_object) | resource | +| [aws_s3_object.logs](https://registry.terraform.io/providers/hashicorp/aws/latest/docs/resources/s3_object) | resource | +| [aws_s3_object.savepoints](https://registry.terraform.io/providers/hashicorp/aws/latest/docs/resources/s3_object) | resource | +| [kubernetes_namespace_v1.flink_team_a](https://registry.terraform.io/providers/hashicorp/kubernetes/latest/docs/resources/namespace_v1) | resource | +| [aws_ami.x86](https://registry.terraform.io/providers/hashicorp/aws/latest/docs/data-sources/ami) | data source | +| [aws_availability_zones.available](https://registry.terraform.io/providers/hashicorp/aws/latest/docs/data-sources/availability_zones) | data source | +| [aws_caller_identity.current](https://registry.terraform.io/providers/hashicorp/aws/latest/docs/data-sources/caller_identity) | data source | +| [aws_ecrpublic_authorization_token.token](https://registry.terraform.io/providers/hashicorp/aws/latest/docs/data-sources/ecrpublic_authorization_token) | data source | +| [aws_eks_cluster_auth.this](https://registry.terraform.io/providers/hashicorp/aws/latest/docs/data-sources/eks_cluster_auth) | data source | +| [aws_iam_policy_document.flink_sample_job](https://registry.terraform.io/providers/hashicorp/aws/latest/docs/data-sources/iam_policy_document) | data source | + +## Inputs + +| Name | Description | Type | Default | Required | +|------|-------------|------|---------|:--------:| +| [eks\_cluster\_version](#input\_eks\_cluster\_version) | EKS version for the cluster | `string` | `"1.28"` | no | +| [name](#input\_name) | Name of the VPC and EKS Cluster | `string` | `"emr-eks-flink"` | no | +| [region](#input\_region) | Region for deployment | `string` | `"us-west-2"` | no | + +## Outputs + +| Name | Description | +|------|-------------| +| [configure\_kubectl](#output\_configure\_kubectl) | Configure kubectl: make sure you're logged in with the correct AWS profile and run the following command to update your kubeconfig | +| [flink\_job\_execution\_role\_arn](#output\_flink\_job\_execution\_role\_arn) | IAM linked role for the flink job | +| [flink\_operator\_bucket](#output\_flink\_operator\_bucket) | S3 bucket name for Flink operator data,logs,checkpoint and savepoint | +| [flink\_operator\_role\_arn](#output\_flink\_operator\_role\_arn) | IAM linked role for the flink operator | + diff --git a/streaming/emr-eks-flink/addons.tf b/streaming/emr-eks-flink/addons.tf new file mode 100644 index 000000000..0c884dfba --- /dev/null +++ b/streaming/emr-eks-flink/addons.tf @@ -0,0 +1,242 @@ +#--------------------------------------------------------------- +# IRSA for EBS CSI Driver +#--------------------------------------------------------------- +module "ebs_csi_driver_irsa" { + source = "terraform-aws-modules/iam/aws//modules/iam-role-for-service-accounts-eks" + version = "~> 5.20" + role_name_prefix = format("%s-%s", local.name, "ebs-csi-driver") + attach_ebs_csi_policy = true + oidc_providers = { + main = { + provider_arn = module.eks.oidc_provider_arn + namespace_service_accounts = ["kube-system:ebs-csi-controller-sa"] + } + } + tags = local.tags +} +#--------------------------------------------------------------- +# EKS Blueprints Kubernetes Addons +#--------------------------------------------------------------- +module "eks_blueprints_addons" { + source = "aws-ia/eks-blueprints-addons/aws" + version = "~> 1.2" + + cluster_name = module.eks.cluster_name + cluster_endpoint = module.eks.cluster_endpoint + cluster_version = module.eks.cluster_version + oidc_provider_arn = module.eks.oidc_provider_arn + + #--------------------------------------- + # Amazon EKS Managed Add-ons + #--------------------------------------- + eks_addons = { + aws-ebs-csi-driver = { + service_account_role_arn = module.ebs_csi_driver_irsa.iam_role_arn + } + coredns = { + preserve = true + } + vpc-cni = { + preserve = true + } + kube-proxy = { + preserve = true + } + } + + #--------------------------------------- + # Install cert-manager + #--------------------------------------- + enable_cert_manager = true + cert_manager = { + set_values = [ + { + name = "extraArgs[0]" + value = "--enable-certificate-owner-ref=false" + }, + ] + } + + #--------------------------------------- + # Metrics Server + #--------------------------------------- + enable_metrics_server = true + metrics_server = { + values = [templatefile("${path.module}/helm-values/metrics-server-values.yaml", {})] + } + + #--------------------------------------- + # Cluster Autoscaler + #--------------------------------------- + enable_cluster_autoscaler = true + cluster_autoscaler = { + create_role = true + values = [templatefile("${path.module}/helm-values/cluster-autoscaler-values.yaml", { + aws_region = var.region, + eks_cluster_id = module.eks.cluster_name + })] + } + + #--------------------------------------- + # Karpenter Autoscaler for EKS Cluster + #--------------------------------------- + enable_karpenter = true + karpenter_enable_spot_termination = true + karpenter = { + chart_version = "v0.34.0" + repository_username = data.aws_ecrpublic_authorization_token.token.user_name + repository_password = data.aws_ecrpublic_authorization_token.token.password + } + karpenter_node = { + iam_role_name = "${local.name}-karpenter-node" + iam_role_use_name_prefix = false + iam_role_additional_policies = { + AmazonSSMManagedInstanceCore = "arn:aws:iam::aws:policy/AmazonSSMManagedInstanceCore" + } + } + + #--------------------------------------- + # CloudWatch metrics for EKS + #--------------------------------------- + enable_aws_cloudwatch_metrics = true + aws_cloudwatch_metrics = { + values = [templatefile("${path.module}/helm-values/aws-cloudwatch-metrics-values.yaml", {})] + } + +} + +#--------------------------------------------------------------- +# Data on EKS Kubernetes Addons +#--------------------------------------------------------------- +module "eks_data_addons" { + depends_on = [module.flink_irsa_jobs, module.flink_irsa_operator] + + source = "aws-ia/eks-data-addons/aws" + version = "~> 1.30" # ensure to update this to the latest/desired version + oidc_provider_arn = module.eks.oidc_provider_arn + + #--------------------------------------------------------------- + # EMR Flink operator + #--------------------------------------------------------------- + enable_emr_flink_operator = true + emr_flink_operator_helm_config = { + repository = "oci://public.ecr.aws/emr-on-eks" + operatorExecutionRoleArn = module.flink_irsa_operator.iam_role_arn + } + + #--------------------------------------------------------------- + # Karpenter nodepools + #--------------------------------------------------------------- + enable_karpenter_resources = true + karpenter_resources_helm_config = { + flink-compute-optimized = { + values = [ + <<-EOT + name: flink-compute-optimized + + clusterName: ${module.eks.cluster_name} + ec2NodeClass: + + karpenterRole: ${split("/", module.eks_blueprints_addons.karpenter.node_iam_role_arn)[1]} + subnetSelectorTerms: + tags: + Name: "${module.eks.cluster_name}-private*" + securityGroupSelectorTerms: + tags: + Name: ${module.eks.cluster_name}-node + instanceStorePolicy: RAID0 + + nodePool: + + labels: + - type: karpenter + - NodeGroupType: FlinkComputeOptimized + - multiArch: Flink + nodeClassRef: + name: flink-compute-optimized + requirements: + - key: "karpenter.sh/capacity-type" + operator: In + values: ["spot","on-demand"] + - key: "kubernetes.io/arch" + operator: In + values: ["amd64"] + - key: "karpenter.k8s.aws/instance-category" + operator: In + values: ["c"] + - key: "karpenter.k8s.aws/instance-family" + operator: In + values: ["c5d"] + - key: "karpenter.k8s.aws/instance-cpu" + operator: In + values: ["4", "8", "16", "36"] + - key: "karpenter.k8s.aws/instance-hypervisor" + operator: In + values: ["nitro"] + - key: "karpenter.k8s.aws/instance-generation" + operator: Gt + values: ["2"] + + limits: + cpu: 1000 + disruption: + consolidationPolicy: WhenEmpty + consolidateAfter: 30s + expireAfter: 720h + weight: 100 + EOT + ] + } + flink-graviton-memory-optimized = { + values = [ + <<-EOT + name: flink-graviton-memory-optimized + clusterName: ${module.eks.cluster_name} + ec2NodeClass: + karpenterRole: ${split("/", module.eks_blueprints_addons.karpenter.node_iam_role_arn)[1]} + subnetSelectorTerms: + tags: + Name: "${module.eks.cluster_name}-private*" + securityGroupSelectorTerms: + tags: + Name: ${module.eks.cluster_name}-node + instanceStorePolicy: RAID0 + nodePool: + labels: + - type: karpenter + - NodeGroupType: FlinkGravitonMemoryOptimized + - multiArch: Flink + requirements: + - key: "karpenter.sh/capacity-type" + operator: In + values: ["spot", "on-demand"] + - key: "kubernetes.io/arch" + operator: In + values: ["arm64"] + - key: "karpenter.k8s.aws/instance-category" + operator: In + values: ["r"] + - key: "karpenter.k8s.aws/instance-family" + operator: In + values: ["r6gd"] + - key: "karpenter.k8s.aws/instance-cpu" + operator: In + values: ["4", "8", "16", "32"] + - key: "karpenter.k8s.aws/instance-hypervisor" + operator: In + values: ["nitro"] + - key: "karpenter.k8s.aws/instance-generation" + operator: Gt + values: ["2"] + limits: + cpu: 1000 + disruption: + consolidationPolicy: WhenEmpty + consolidateAfter: 30s + expireAfter: 720h + weight: 50 + EOT + ] + } + } +} diff --git a/streaming/emr-eks-flink/cleanup.sh b/streaming/emr-eks-flink/cleanup.sh new file mode 100755 index 000000000..da1fb7c16 --- /dev/null +++ b/streaming/emr-eks-flink/cleanup.sh @@ -0,0 +1,45 @@ +#!/bin/bash +set -o errexit +set -o pipefail + +targets=( + "module.eks" + "module.vpc" +) + +#------------------------------------------- +# Helpful to delete the stuck in "Terminating" namespaces +# Rerun the cleanup.sh script to detect and delete the stuck resources +#------------------------------------------- +terminating_namespaces=$(kubectl get namespaces --field-selector status.phase=Terminating -o json | jq -r '.items[].metadata.name') + +# If there are no terminating namespaces, exit the script +if [[ -z $terminating_namespaces ]]; then + echo "No terminating namespaces found" +fi + +for ns in $terminating_namespaces; do + echo "Terminating namespace: $ns" + kubectl get namespace $ns -o json | sed 's/"kubernetes"//' | kubectl replace --raw "/api/v1/namespaces/$ns/finalize" -f - +done + +for target in "${targets[@]}" +do + terraform destroy -target="$target" -auto-approve + destroy_output=$(terraform destroy -target="$target" -auto-approve 2>&1) + if [[ $? -eq 0 && $destroy_output == *"Destroy complete!"* ]]; then + echo "SUCCESS: Terraform destroy of $target completed successfully" + else + echo "FAILED: Terraform destroy of $target failed" + exit 1 + fi +done + +terraform destroy -auto-approve +destroy_output=$(terraform destroy -auto-approve 2>&1) +if [[ $? -eq 0 && $destroy_output == *"Destroy complete!"* ]]; then + echo "SUCCESS: Terraform destroy of all targets completed successfully" +else + echo "FAILED: Terraform destroy of all targets failed" + exit 1 +fi diff --git a/streaming/emr-eks-flink/examples/cluster-autoscaler/flink-sample-job.yaml b/streaming/emr-eks-flink/examples/cluster-autoscaler/flink-sample-job.yaml new file mode 100644 index 000000000..875200393 --- /dev/null +++ b/streaming/emr-eks-flink/examples/cluster-autoscaler/flink-sample-job.yaml @@ -0,0 +1,64 @@ +# NOTE: Make sure you replace with your S3 Bucket before running this job. +# Replace the with the flink_job_execution_role_arn output. +--- +apiVersion: flink.apache.org/v1beta1 +kind: FlinkDeployment +metadata: + name: basic-example-cluster-auto-flink + namespace: flink-team-a-ns +spec: + imagePullPolicy: Always + emrReleaseLabel: "emr-7.1.0-flink-latest" + flinkVersion: v1_18 + flinkConfiguration: + taskmanager.numberOfTaskSlots: "2" + # Autotuning parameters + kubernetes.operator.job.autoscaler.autotune.enable: "true" + kubernetes.operator.job.autoscaler.autotune.metrics.history.max.count: "2" + kubernetes.operator.job.autoscaler.autotune.metrics.restart.count: "1" + metrics.job.status.enable: TOTAL_TIME + + # Autoscaler parameters + kubernetes.operator.job.autoscaler.enabled: "true" + kubernetes.operator.job.autoscaler.scaling.enabled: "true" + kubernetes.operator.job.autoscaler.stabilization.interval: "5s" + kubernetes.operator.job.autoscaler.metrics.window: "1m" + + jobmanager.scheduler: adaptive + # Replace with s3 bucket in your own account + state.checkpoints.dir: s3:///checkpoints + state.savepoints.dir: s3:///savepoints + + # Replace this execution role ARN with your own + executionRoleArn: + jobManager: + # Replace with s3 bucket in your own account + storageDir: s3:///data/basic-example-app-cluster/jobmanager + resource: + memory: "2048m" + cpu: 1 + + taskManager: + replicas: 2 + resource: + memory: "2048m" + cpu: 1 + + job: + # if you have your job jar in S3 bucket you can use that path as well + jarURI: local:///opt/flink/examples/streaming/StateMachineExample.jar + parallelism: 4 + upgradeMode: savepoint + savepointTriggerNonce: 0 + monitoringConfiguration: + s3MonitoringConfiguration: + logUri: /logs + cloudWatchMonitoringConfiguration: + logGroupName: /aws/emr-flink/flink-team-a + sideCarResources: + limits: + cpuLimit: 500m + memoryLimit: 250Mi + containerLogRotationConfiguration: + rotationSize: 2GB + maxFilesToKeep: "10" diff --git a/streaming/emr-eks-flink/examples/karpenter/basic-example-app-cluster.yaml b/streaming/emr-eks-flink/examples/karpenter/basic-example-app-cluster.yaml new file mode 100644 index 000000000..51992b810 --- /dev/null +++ b/streaming/emr-eks-flink/examples/karpenter/basic-example-app-cluster.yaml @@ -0,0 +1,74 @@ +# NOTE: Make sure you replace with your S3 Bucket before running this job. +# Replace the with the flink_job_execution_role_arn output. +--- +apiVersion: flink.apache.org/v1beta1 +kind: FlinkDeployment +metadata: + name: basic-example-karpenter-flink + namespace: flink-team-a-ns +spec: + imagePullPolicy: Always + emrReleaseLabel: "emr-7.1.0-flink-latest" + flinkVersion: v1_18 + flinkConfiguration: + taskmanager.numberOfTaskSlots: "2" + # Autotuning parameters + kubernetes.operator.job.autoscaler.autotune.enable: "true" + kubernetes.operator.job.autoscaler.autotune.metrics.history.max.count: "2" + kubernetes.operator.job.autoscaler.autotune.metrics.restart.count: "1" + metrics.job.status.enable: TOTAL_TIME + + # Autoscaler parameters + kubernetes.operator.job.autoscaler.enabled: "true" + kubernetes.operator.job.autoscaler.scaling.enabled: "true" + kubernetes.operator.job.autoscaler.stabilization.interval: "5s" + kubernetes.operator.job.autoscaler.metrics.window: "1m" + + jobmanager.scheduler: adaptive + # Replace with s3 bucket in your own account + state.checkpoints.dir: s3:///checkpoints + state.savepoints.dir: s3:///savepoints + + # Replace this execution role ARN with your own + executionRoleArn: <> + + podTemplate: + apiVersion: v1 + kind: Pod + metadata: + name: pod-template + spec: + nodeSelector: + NodeGroupType: "FlinkComputeOptimized" + + jobManager: + # Replace with s3 bucket in your own account + storageDir: s3:///data/basic-example-app-cluster/jobmanager + resource: + memory: "2048m" + cpu: 1 + + taskManager: + replicas: 2 + resource: + memory: "2048m" + cpu: 1 + + job: + # if you have your job jar in S3 bucket you can use that path as well + jarURI: local:///opt/flink/examples/streaming/StateMachineExample.jar + parallelism: 4 + upgradeMode: savepoint + savepointTriggerNonce: 0 + monitoringConfiguration: + s3MonitoringConfiguration: + logUri: /logs + cloudWatchMonitoringConfiguration: + logGroupName: /aws/emr-flink/flink-team-a + sideCarResources: + limits: + cpuLimit: 500m + memoryLimit: 250Mi + containerLogRotationConfiguration: + rotationSize: 2GB + maxFilesToKeep: "10" diff --git a/streaming/emr-eks-flink/examples/karpenter/graviton-example-app-cluster.yaml b/streaming/emr-eks-flink/examples/karpenter/graviton-example-app-cluster.yaml new file mode 100644 index 000000000..720ffdc6f --- /dev/null +++ b/streaming/emr-eks-flink/examples/karpenter/graviton-example-app-cluster.yaml @@ -0,0 +1,75 @@ +# NOTE: Make sure you replace with your S3 Bucket before running this job. +# Replace the with the flink_job_execution_role_arn output. +--- +apiVersion: flink.apache.org/v1beta1 +kind: FlinkDeployment +metadata: + name: graviton-example-karpenter-flink + namespace: flink-team-a-ns +spec: + imagePullPolicy: Always + emrReleaseLabel: "emr-7.1.0-flink-latest" + flinkVersion: v1_18 + flinkConfiguration: + taskmanager.numberOfTaskSlots: "2" + # Autotuning parameters + kubernetes.operator.job.autoscaler.autotune.enable: "true" + kubernetes.operator.job.autoscaler.autotune.metrics.history.max.count: "2" + kubernetes.operator.job.autoscaler.autotune.metrics.restart.count: "1" + metrics.job.status.enable: TOTAL_TIME + + # Autoscaler parameters + kubernetes.operator.job.autoscaler.enabled: "true" + kubernetes.operator.job.autoscaler.scaling.enabled: "true" + kubernetes.operator.job.autoscaler.stabilization.interval: "5s" + kubernetes.operator.job.autoscaler.metrics.window: "1m" + + jobmanager.scheduler: adaptive + # Replace with s3 bucket in your own account + state.checkpoints.dir: s3:///checkpoints + state.savepoints.dir: s3:///savepoints + + # Replace this execution role ARN with your own + executionRoleArn: + + podTemplate: + apiVersion: v1 + kind: Pod + metadata: + name: pod-template + spec: + nodeSelector: + NodeGroupType: "FlinkGravitonMemoryOptimized" + + + jobManager: + # Replace with s3 bucket in your own account + storageDir: s3:///data/basic-example-app-cluster/jobmanager + resource: + memory: "2048m" + cpu: 1 + + taskManager: + replicas: 2 + resource: + memory: "2048m" + cpu: 1 + + job: + # if you have your job jar in S3 bucket you can use that path as well + jarURI: local:///opt/flink/examples/streaming/StateMachineExample.jar + parallelism: 4 + upgradeMode: savepoint + savepointTriggerNonce: 0 + monitoringConfiguration: + s3MonitoringConfiguration: + logUri: /logs + cloudWatchMonitoringConfiguration: + logGroupName: /aws/emr-flink/flink-team-a + sideCarResources: + limits: + cpuLimit: 500m + memoryLimit: 250Mi + containerLogRotationConfiguration: + rotationSize: 2GB + maxFilesToKeep: "10" diff --git a/streaming/emr-eks-flink/flink-team.tf b/streaming/emr-eks-flink/flink-team.tf new file mode 100644 index 000000000..80d157f38 --- /dev/null +++ b/streaming/emr-eks-flink/flink-team.tf @@ -0,0 +1,156 @@ +resource "kubernetes_namespace_v1" "flink_team_a" { + metadata { + name = "${local.flink_team}-ns" + } + timeouts { + delete = "15m" + } +} + +#--------------------------------------------------------------- +# Creates IAM policy for IRSA. Provides IAM permissions for flink pods +#--------------------------------------------------------------- +resource "aws_iam_policy" "flink" { + description = "IAM role policy for flink Job execution" + name = "${local.name}-flink-irsa" + policy = data.aws_iam_policy_document.flink_sample_job.json +} + +#--------------------------------------------------------------- +# IRSA for flink pods for "flink-team-a" +#--------------------------------------------------------------- +module "flink_irsa_jobs" { + source = "aws-ia/eks-blueprints-addon/aws" + version = "~> 1.0" + + # Disable helm release + create_release = false + # IAM role for service account (IRSA) + create_role = true + role_name = "${local.name}-${local.flink_team}" + create_policy = false + role_policies = { + flink_team_a_policy = aws_iam_policy.flink.arn + } + assume_role_condition_test = "StringLike" + oidc_providers = { + this = { + provider_arn = module.eks.oidc_provider_arn + namespace = "${local.flink_team}-ns" + service_account = "emr-containers-sa-*-*-${data.aws_caller_identity.current.account_id}-*" + } + } +} + +#--------------------------------------------------------------- +# IRSA for flink pods for "flink-operator" +#--------------------------------------------------------------- +module "flink_irsa_operator" { + source = "aws-ia/eks-blueprints-addon/aws" + version = "~> 1.0" + + # Disable helm release + create_release = false + # IAM role for service account (IRSA) + create_role = true + role_name = "${local.name}-operator" + create_policy = false + role_policies = { + flink_team_a_policy = aws_iam_policy.flink.arn + } + assume_role_condition_test = "StringLike" + oidc_providers = { + this = { + provider_arn = module.eks.oidc_provider_arn + namespace = "${local.flink_operator}-ns" + service_account = "emr-containers-sa-flink-operator" + } + } +} + +#--------------------------------------------------------------- +# Creates a log group +#--------------------------------------------------------------- +resource "aws_cloudwatch_log_group" "flink_team_a" { + name = "/aws/emr-flink/flink-team-a" + retention_in_days = 7 +} + +#--------------------------------------------------------------- +# Example IAM policy for Flink job execution +#--------------------------------------------------------------- +data "aws_iam_policy_document" "flink_sample_job" { + statement { + sid = "" + effect = "Allow" + resources = ["*"] + actions = [ + "s3:ListBucket", + "s3:GetObject", + "s3:PutObject", + "s3:DeleteObject", + "s3:GetBucketLocation", + "s3:GetObjectVersion" + ] + } + statement { + sid = "" + effect = "Allow" + resources = ["*"] + + actions = [ + "logs:CreateLogGroup", + "logs:CreateLogStream", + "logs:DescribeLogGroups", + "logs:DescribeLogStreams", + "logs:PutLogEvents", + ] + } +} + +#--------------------------------------------------------------- +# S3 bucket for Flink related data,logs and checkpoint +#--------------------------------------------------------------- +module "s3_bucket" { + source = "terraform-aws-modules/s3-bucket/aws" + version = "~> 3.0" + bucket_prefix = "${local.name}-" + attach_require_latest_tls_policy = true + block_public_acls = true + block_public_policy = true + ignore_public_acls = true + restrict_public_buckets = true + + server_side_encryption_configuration = { + rule = { + apply_server_side_encryption_by_default = { + sse_algorithm = "AES256" + } + } + } + tags = local.tags +} + +resource "aws_s3_object" "checkpoints" { + bucket = module.s3_bucket.s3_bucket_id + key = "checkpoints/" + content_type = "application/x-directory" +} + +resource "aws_s3_object" "savepoints" { + bucket = module.s3_bucket.s3_bucket_id + key = "savepoints/" + content_type = "application/x-directory" +} + +resource "aws_s3_object" "jobmanager" { + bucket = module.s3_bucket.s3_bucket_id + key = "jobmanager/" + content_type = "application/x-directory" +} + +resource "aws_s3_object" "logs" { + bucket = module.s3_bucket.s3_bucket_id + key = "logs/" + content_type = "application/x-directory" +} diff --git a/streaming/emr-eks-flink/helm-values/aws-cloudwatch-metrics-values.yaml b/streaming/emr-eks-flink/helm-values/aws-cloudwatch-metrics-values.yaml new file mode 100644 index 000000000..ae3c41d44 --- /dev/null +++ b/streaming/emr-eks-flink/helm-values/aws-cloudwatch-metrics-values.yaml @@ -0,0 +1,11 @@ +resources: + limits: + cpu: 500m + memory: 2Gi + requests: + cpu: 200m + memory: 1Gi + +# This toleration allows Daemonset pod to be scheduled on any node, regardless of their Taints. +tolerations: + - operator: Exists diff --git a/streaming/emr-eks-flink/helm-values/cluster-autoscaler-values.yaml b/streaming/emr-eks-flink/helm-values/cluster-autoscaler-values.yaml new file mode 100644 index 000000000..5a42794f2 --- /dev/null +++ b/streaming/emr-eks-flink/helm-values/cluster-autoscaler-values.yaml @@ -0,0 +1,25 @@ +autoDiscovery: + clusterName: ${eks_cluster_id} + +awsRegion: ${aws_region} + +cloudProvider: aws + +extraArgs: + aws-use-static-instance-list: true + +# Best practice to update the resource requests and limits for each add-on +resources: + limits: + cpu: 1000m + memory: 1G + requests: + cpu: 200m + memory: 512Mi + +# Best practice to updateStrategy for each add-on +updateStrategy: + type: RollingUpdate + rollingUpdate: + maxSurge: 0 + maxUnavailable: 1 diff --git a/streaming/emr-eks-flink/helm-values/metrics-server-values.yaml b/streaming/emr-eks-flink/helm-values/metrics-server-values.yaml new file mode 100644 index 000000000..026d97a6a --- /dev/null +++ b/streaming/emr-eks-flink/helm-values/metrics-server-values.yaml @@ -0,0 +1,52 @@ +# HA config for metrics-server +image: + repository: registry.k8s.io/metrics-server/metrics-server + pullPolicy: IfNotPresent + +serviceAccount: + create: true + name: metrics-server + +rbac: + create: true + pspEnabled: false + +apiService: + create: true + +podLabels: + k8s-app: metrics-server + +# HA enabled by enabling replicas to 2, updateStrategy and podDisruptionBudget to true +replicas: 2 + +updateStrategy: + type: RollingUpdate + rollingUpdate: + maxSurge: 0 + maxUnavailable: 1 + +podDisruptionBudget: + enabled: true + minAvailable: 1 + +defaultArgs: + - --cert-dir=/tmp + - --kubelet-preferred-address-types=InternalIP,ExternalIP,Hostname + - --kubelet-use-node-status-port + - --metric-resolution=15s + +resources: + requests: + cpu: 200m + memory: 512Mi + +affinity: + podAntiAffinity: + requiredDuringSchedulingIgnoredDuringExecution: + - labelSelector: + matchLabels: + k8s-app: metrics-server + namespaces: + - kube-system + topologyKey: kubernetes.io/hostname diff --git a/streaming/emr-eks-flink/install.sh b/streaming/emr-eks-flink/install.sh new file mode 100755 index 000000000..9d43e080b --- /dev/null +++ b/streaming/emr-eks-flink/install.sh @@ -0,0 +1,37 @@ +#!/bin/bash + +read -p "Enter the region: " region +export AWS_DEFAULT_REGION=$region + +# List of Terraform modules to apply in sequence +targets=( + "module.vpc" + "module.eks" + +) + +# Initialize Terraform +terraform init --upgrade + +# Apply modules in sequence +for target in "${targets[@]}" +do + echo "Applying module $target..." + apply_output=$(terraform apply -target="$target" -var="region=$region" -auto-approve 2>&1 | tee /dev/tty) + if [[ ${PIPESTATUS[0]} -eq 0 && $apply_output == *"Apply complete"* ]]; then + echo "SUCCESS: Terraform apply of $target completed successfully" + else + echo "FAILED: Terraform apply of $target failed" + exit 1 + fi +done + +# Final apply to catch any remaining resources +echo "Applying remaining resources..." +apply_output=$(terraform apply -var="region=$region" -auto-approve 2>&1 | tee /dev/tty) +if [[ ${PIPESTATUS[0]} -eq 0 && $apply_output == *"Apply complete"* ]]; then + echo "SUCCESS: Terraform apply of all modules completed successfully" +else + echo "FAILED: Terraform apply of all modules failed" + exit 1 +fi diff --git a/streaming/emr-eks-flink/main.tf b/streaming/emr-eks-flink/main.tf new file mode 100644 index 000000000..b34e44c99 --- /dev/null +++ b/streaming/emr-eks-flink/main.tf @@ -0,0 +1,166 @@ +#create local +locals { + name = var.name + region = var.region + tags = { + Blueprint = local.name + GithubRepo = "github.com/awslabs/data-on-eks" + } + flink_team = "flink-team-a" + flink_operator = "flink-kubernetes-operator" +} + +data "aws_eks_cluster_auth" "this" { + name = module.eks.cluster_name +} + +data "aws_ecrpublic_authorization_token" "token" { + provider = aws.ecr +} + +data "aws_availability_zones" "available" {} + +data "aws_caller_identity" "current" {} + +# This data source can be used to get the latest AMI for Managed Node Groups +data "aws_ami" "x86" { + owners = ["amazon"] + most_recent = true + filter { + name = "name" + values = ["amazon-eks-node-${module.eks.cluster_version}-*"] # Update this for ARM ["amazon-eks-arm64-node-${module.eks.cluster_version}-*"] + } +} + +#--------------------------------------------------------------- +# EKS Cluster +#--------------------------------------------------------------- +module "eks" { + source = "terraform-aws-modules/eks/aws" + version = "~> 19.15" + + cluster_name = local.name + cluster_version = var.eks_cluster_version + cluster_endpoint_private_access = true # if true, Kubernetes API requests within your cluster's VPC (such as node to control plane communication) use the private VPC endpoint + cluster_endpoint_public_access = true # if true, Your cluster API server is accessible from the internet. You can, optionally, limit the CIDR blocks that can access the public endpoint. + vpc_id = module.vpc.vpc_id + subnet_ids = module.vpc.private_subnets + manage_aws_auth_configmap = true + + aws_auth_roles = [ + # We need to add in the Karpenter node IAM role for nodes launched by Karpenter + { + rolearn = module.eks_blueprints_addons.karpenter.node_iam_role_arn + username = "system:node:{{EC2PrivateDNSName}}" + groups = [ + "system:bootstrappers", + "system:nodes", + ] + } + ] + + #--------------------------------------- + # Note: This can further restricted to specific required for each Add-on and your application + #--------------------------------------- + # Extend cluster security group rules + cluster_security_group_additional_rules = { + ingress_nodes_ephemeral_ports_tcp = { + description = "Nodes on ephemeral ports" + protocol = "tcp" + from_port = 1025 + to_port = 65535 + type = "ingress" + source_node_security_group = true + } + } + + # Extend node-to-node security group rules + node_security_group_additional_rules = { + ingress_self_all = { + description = "Node to node all ports/protocols" + protocol = "-1" + from_port = 0 + to_port = 0 + type = "ingress" + self = true + } + egress_all = { + description = "Node all egress" + protocol = "-1" + from_port = 0 + to_port = 0 + type = "egress" + cidr_blocks = ["0.0.0.0/0"] + ipv6_cidr_blocks = ["::/0"] + } + # Allows Control Plane Nodes to talk to Worker nodes on all ports. Added this to simplify the example and further avoid issues with Add-ons communication with Control plane. + # This can be restricted further to specific port based on the requirement for each Add-on e.g., metrics-server 4443, spark-operator 8080, karpenter 8443 etc. + # Change this according to your security requirements if needed + ingress_cluster_to_node_all_traffic = { + description = "Cluster API to Nodegroup all traffic" + protocol = "-1" + from_port = 0 + to_port = 0 + type = "ingress" + source_cluster_security_group = true + } + } + + eks_managed_node_group_defaults = { + iam_role_additional_policies = { + # Not required, but used in the example to access the nodes to inspect mounted volumes + AmazonSSMManagedInstanceCore = "arn:aws:iam::aws:policy/AmazonSSMManagedInstanceCore" + } + } + + eks_managed_node_groups = { + # We recommend to have a MNG to place your critical workloads and add-ons + # Then rely on Karpenter to scale your workloads + # You can also make uses on nodeSelector and Taints/tolerations to spread workloads on MNG or Karpenter provisioners + core_node_group = { + name = "core-node-group" + description = "EKS managed node group example launch template" + ami_id = data.aws_ami.x86.image_id + # This will ensure the bootstrap user data is used to join the node + # By default, EKS managed node groups will not append bootstrap script; + # this adds it back in using the default template provided by the module + # Note: this assumes the AMI provided is an EKS optimized AMI derivative + enable_bootstrap_user_data = true + # Optional - This is to show how you can pass pre bootstrap data + pre_bootstrap_user_data = <<-EOT + echo "Node bootstrap process started by Data on EKS" + EOT + # Optional - Post bootstrap data to verify anything + post_bootstrap_user_data = <<-EOT + echo "Bootstrap complete.Ready to Go!" + EOT + subnet_ids = module.vpc.private_subnets + min_size = 3 + max_size = 9 + desired_size = 3 + force_update_version = true + instance_types = ["m5.xlarge"] + ebs_optimized = true + block_device_mappings = { + xvda = { + device_name = "/dev/xvda" + ebs = { + volume_size = 100 + volume_type = "gp3" + } + } + } + update_config = { + max_unavailable_percentage = 50 + } + labels = { + WorkerType = "ON_DEMAND" + NodeGroupType = "core" + } + tags = { + Name = "core-node-grp", + "karpenter.sh/discovery" = local.name + } + } + } +} diff --git a/streaming/emr-eks-flink/outputs.tf b/streaming/emr-eks-flink/outputs.tf new file mode 100644 index 000000000..c09749ec1 --- /dev/null +++ b/streaming/emr-eks-flink/outputs.tf @@ -0,0 +1,25 @@ +################################################################################ +# EMR Flink operator +################################################################################ +output "flink_job_execution_role_arn" { + value = module.flink_irsa_jobs.iam_role_arn + description = "IAM linked role for the flink job" +} + +output "flink_operator_role_arn" { + value = module.flink_irsa_operator.iam_role_arn + description = "IAM linked role for the flink operator" +} + +output "flink_operator_bucket" { + value = module.s3_bucket.s3_bucket_id + description = "S3 bucket name for Flink operator data,logs,checkpoint and savepoint" +} + +################################################################################ +# EKS Managed Node Group +################################################################################ +output "configure_kubectl" { + description = "Configure kubectl: make sure you're logged in with the correct AWS profile and run the following command to update your kubeconfig" + value = "aws eks --region ${local.region} update-kubeconfig --name ${module.eks.cluster_name}" +} diff --git a/streaming/emr-eks-flink/providers.tf b/streaming/emr-eks-flink/providers.tf new file mode 100644 index 000000000..09aa297b0 --- /dev/null +++ b/streaming/emr-eks-flink/providers.tf @@ -0,0 +1,33 @@ + +provider "aws" { + region = local.region +} + +# ECR always authenticates with `us-east-1` region +# Docs -> https://docs.aws.amazon.com/AmazonECR/latest/public/public-registries.html +provider "aws" { + alias = "ecr" + region = "us-east-1" +} + +provider "kubernetes" { + host = module.eks.cluster_endpoint + cluster_ca_certificate = base64decode(module.eks.cluster_certificate_authority_data) + token = data.aws_eks_cluster_auth.this.token +} + +provider "helm" { + kubernetes { + host = module.eks.cluster_endpoint + cluster_ca_certificate = base64decode(module.eks.cluster_certificate_authority_data) + token = data.aws_eks_cluster_auth.this.token + } +} + +provider "kubectl" { + apply_retry_count = 30 + host = module.eks.cluster_endpoint + cluster_ca_certificate = base64decode(module.eks.cluster_certificate_authority_data) + load_config_file = false + token = data.aws_eks_cluster_auth.this.token +} diff --git a/streaming/emr-eks-flink/variables.tf b/streaming/emr-eks-flink/variables.tf new file mode 100644 index 000000000..e19d5ad69 --- /dev/null +++ b/streaming/emr-eks-flink/variables.tf @@ -0,0 +1,17 @@ +variable "region" { + type = string + default = "us-west-2" + description = "Region for deployment" +} + +variable "name" { + description = "Name of the VPC and EKS Cluster" + default = "emr-eks-flink" + type = string +} + +variable "eks_cluster_version" { + type = string + default = "1.28" + description = "EKS version for the cluster" +} diff --git a/streaming/emr-eks-flink/versions.tf b/streaming/emr-eks-flink/versions.tf new file mode 100644 index 000000000..a18160120 --- /dev/null +++ b/streaming/emr-eks-flink/versions.tf @@ -0,0 +1,33 @@ +terraform { + required_version = ">= 1.0.0" + + required_providers { + aws = { + source = "hashicorp/aws" + version = ">= 3.72" + } + kubernetes = { + source = "hashicorp/kubernetes" + version = ">= 2.10" + } + helm = { + source = "hashicorp/helm" + version = ">= 2.13.0" + } + kubectl = { + source = "gavinbunney/kubectl" + version = ">= 1.14" + } + random = { + source = "hashicorp/random" + version = "3.3.2" + } + } + + # ## Used for end-to-end testing on project; update to suit your needs + # backend "s3" { + # bucket = "doeks-github-actions-e2e-test-state" + # region = "us-west-2" + # key = "e2e/emr-eks-flink/terraform.tfstate" + # } +} diff --git a/streaming/emr-eks-flink/vpc.tf b/streaming/emr-eks-flink/vpc.tf new file mode 100644 index 000000000..38f9ce7b2 --- /dev/null +++ b/streaming/emr-eks-flink/vpc.tf @@ -0,0 +1,26 @@ +#--------------------------------------------------------------- +# Supporting Network Resources +#--------------------------------------------------------------- +# WARNING: This VPC module includes the creation of an Internet Gateway and NAT Gateway, which simplifies cluster deployment and testing, primarily intended for sandbox accounts. +# IMPORTANT: For preprod and prod use cases, it is crucial to consult with your security team and AWS architects to design a private infrastructure solution that aligns with your security requirements +module "vpc" { + source = "terraform-aws-modules/vpc/aws" + version = "5.5.1" + + name = local.name + cidr = "10.0.0.0/16" + azs = slice(data.aws_availability_zones.available.names, 0, 3) + private_subnets = ["10.0.1.0/24", "10.0.2.0/24", "10.0.3.0/24"] + public_subnets = ["10.0.4.0/24", "10.0.5.0/24", "10.0.6.0/24"] + enable_nat_gateway = true + single_nat_gateway = true + enable_dns_hostnames = true + public_subnet_tags = { + "kubernetes.io/cluster/${local.name}" = "shared" + "kubernetes.io/role/elb" = 1 + } + private_subnet_tags = { + "kubernetes.io/cluster/${local.name}" = "shared" + "kubernetes.io/role/internal-elb" = 1 + } +} diff --git a/website/docs/blueprints/streaming-platforms/emr-eks-flink.md b/website/docs/blueprints/streaming-platforms/emr-eks-flink.md new file mode 100644 index 000000000..b417bf151 --- /dev/null +++ b/website/docs/blueprints/streaming-platforms/emr-eks-flink.md @@ -0,0 +1,338 @@ +--- +sidebar_position: 3 +title: EMR on EKS with Flink Streaming +--- + +import Tabs from '@theme/Tabs'; +import TabItem from '@theme/TabItem'; +import CollapsibleContent from '../../../src/components/CollapsibleContent'; + +:::info +Please note that we are working on adding more features to this blueprint such as Flink examples with multiple connectors, Ingress for WebUI, Grafana dashboards etc. +::: + +## Introduction to Apache Flink +[Apache Flink](https://flink.apache.org/) is an open-source, unified stream processing and batch processing framework that was designed to process large amounts of data. It provides fast, reliable, and scalable data processing with fault tolerance and exactly-once semantics. +Some of the key features of Flink are: +- **Distributed Processing**: Flink is designed to process large volumes of data in a distributed fashion, making it horizontally scalable and fault-tolerant. +- **Stream Processing and Batch Processing**: Flink provides APIs for both stream processing and batch processing. This means you can process data in real-time, as it's being generated, or process data in batches. +- **Fault Tolerance**: Flink has built-in mechanisms for handling node failures, network partitions, and other types of failures. +- **Exactly-once Semantics**: Flink supports exactly-once processing, which ensures that each record is processed exactly once, even in the presence of failures. +- **Low Latency**: Flink's streaming engine is optimized for low-latency processing, making it suitable for use cases that require real-time processing of data. +- **Extensibility**: Flink provides a rich set of APIs and libraries, making it easy to extend and customize to fit your specific use case. + +## Architecture + +Flink Architecture high level design with EKS. + +![Flink Design UI](img/flink-design.png) + +## EMR on EKS Flink Kubernetes Operator +Amazon EMR releases 6.13.0 and higher support Amazon EMR on EKS with Apache Flink, or the ![EMR Flink Kubernetes operator](https://gallery.ecr.aws/emr-on-eks/flink-kubernetes-operator), as a job submission model for Amazon EMR on EKS. With Amazon EMR on EKS with Apache Flink, you can deploy and manage Flink applications with the Amazon EMR release runtime on your own Amazon EKS clusters. Once you deploy the Flink Kubernetes operator in your Amazon EKS cluster, you can directly submit Flink applications with the operator. The operator manages the lifecycle of Flink applications. +1. Running, suspending and deleting applications +2. Stateful and stateless application upgrades +3. Triggering and managing savepoints +4. Handling errors, rolling-back broken upgrades + +In addition to the above features, EMR Flink Kubernetes operator provides the following additional capabilities: +1. Launching Flink application using jars in Amazon S3 +2. Monitoring integration with Amazon S3 and Amazon CloudWatch and container log rotation. +3. Automatically tunes Autoscaler configurations based on historical trends of observed metrics. +4. Faster Flink Job Restart during scaling or Failure Recovery +5. IRSA (IAM Roles for Service Accounts) Native Integration +6. Pyflink support + + +Flink Operator defines two types of Custom Resources(CR) which are the extensions of the Kubernetes API. + + + + + +**FlinkDeployment** +- FlinkDeployment CR defines **Flink Application** and **Session Cluster** deployments. +- Application deployments manage a single job deployment on a dedicated Flink cluster in Application mode. +- Session clusters allows you to run multiple Flink Jobs on an existing Session cluster. + +
+ FlinkDeployment in Application modes, Click to toggle content! + + ```yaml + apiVersion: flink.apache.org/v1beta1 + kind: FlinkDeployment + metadata: + namespace: default + name: basic-example + spec: + image: flink:1.16 + flinkVersion: v1_16 + flinkConfiguration: + taskmanager.numberOfTaskSlots: "2" + serviceAccount: flink + jobManager: + resource: + memory: "2048m" + cpu: 1 + taskManager: + resource: + memory: "2048m" + cpu: 1 + job: + jarURI: local:///opt/flink/examples/streaming/StateMachineExample.jar + parallelism: 2 + upgradeMode: stateless + state: running + ``` +
+ +
+ + + +**FlinkSessionJob** +- The `FlinkSessionJob` CR defines the session job on the **Session cluster** and each Session cluster can run multiple `FlinkSessionJob`. +- Session deployments manage Flink Session clusters without providing any job management for it + +
+ FlinkSessionJob using an existing "basic-session-cluster" session cluster deployment + + ```yaml + apiVersion: flink.apache.org/v1beta1 + kind: FlinkSessionJob + metadata: + name: basic-session-job-example + spec: + deploymentName: basic-session-cluster + job: + jarURI: https://repo1.maven.org/maven2/org/apache/flink/flink-examples-streaming_2.12/1.15.3/flink-examples-streaming_2.12-1.15.3-TopSpeedWindowing.jar + parallelism: 4 + upgradeMode: stateless + ``` + +
+ +
+
+ +:::info +Session clusters use a similar spec to Application clusters with the only difference that `job` is not defined in the yaml spec. +::: + +:::info +According to the Flink documentation, it is recommended to use FlinkDeployment in Application mode for production environments. +::: + +On top of the deployment types the Flink Kubernetes Operator also supports two modes of deployments: `Native` and `Standalone`. + + + + +**Native** + +- Native cluster deployment is the default deployment mode and uses Flink’s built in integration with Kubernetes when deploying the cluster. +- Flink cluster communicates directly with Kubernetes and allows it to manage Kubernetes resources, e.g. dynamically allocate and de-allocate TaskManager pods. +- Flink Native can be useful for advanced users who want to build their own cluster management system or integrate with existing management systems. +- Flink Native allows for more flexibility in terms of job scheduling and execution. +- For standard Operator use, running your own Flink Jobs in Native mode is recommended. + +```yaml +apiVersion: flink.apache.org/v1beta1 +kind: FlinkDeployment +... +spec: +... +mode: native +``` + + + + +**Standalone** + +- Standalone cluster deployment simply uses Kubernetes as an orchestration platform that the Flink cluster is running on. +- Flink is unaware that it is running on Kubernetes and therefore all Kubernetes resources need to be managed externally, by the Kubernetes Operator. + + ```yaml + apiVersion: flink.apache.org/v1beta1 + kind: FlinkDeployment + ... + spec: + ... + mode: standalone + ``` + + + + +## Best Practices for Running Flink Jobs on Kubernetes +To get the most out of Flink on Kubernetes, here are some best practices to follow: + +- **Use the Kubernetes Operator**: Install and use the Flink Kubernetes Operator to automate the deployment and management of Flink clusters on Kubernetes. +- **Deploy in dedicated namespaces**: Create a separate namespace for the Flink Kubernetes Operator and another one for Flink jobs/workloads. This ensures that the Flink jobs are isolated and have their own resources. +- **Use high-quality storage**: Store Flink checkpoints and savepoints in high-quality storage such as Amazon S3 or another durable external storage. These storage options are reliable, scalable, and offer durability for large volumes of data. +- **Optimize resource allocation**: Allocate sufficient resources to Flink jobs to ensure optimal performance. This can be done by setting resource requests and limits for Flink containers. +- **Proper network isolation**: Use Kubernetes Network Policies to isolate Flink jobs from other workloads running on the same Kubernetes cluster. This ensures that Flink jobs have the required network access without being impacted by other workloads. +- **Configure Flink optimally**: Tune Flink settings according to your use case. For example, adjust Flink's parallelism settings to ensure that Flink jobs are scaled appropriately based on the size of the input data. +- **Use checkpoints and savepoints**: Use checkpoints for periodic snapshots of Flink application state and savepoints for more advanced use cases such as upgrading or downgrading the application. +- **Store checkpoints and savepoints in the right places**: Store checkpoints in distributed file systems or key-value stores like Amazon S3 or another durable external storage. Store savepoints in a durable external storage like Amazon S3. + +## Flink Upgrade +Flink Operator provides three upgrade modes for Flink jobs. Checkout the [Flink upgrade docs](https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/custom-resource/job-management/#stateful-and-stateless-application-upgrades) for up-to-date information. + +1. **stateless**: Stateless application upgrades from empty state +2. **last-state**: Quick upgrades in any application state (even for failing jobs), does not require a healthy job as it always uses the latest checkpoint information. Manual recovery may be necessary if HA metadata is lost. +3. **savepoint**: Use savepoint for upgrade, providing maximal safety and possibility to serve as backup/fork point. The savepoint will be created during the upgrade process. Note that the Flink job needs to be running to allow the savepoint to get created. If the job is in an unhealthy state, the last checkpoint will be used (unless kubernetes.operator.job.upgrade.last-state-fallback.enabled is set to false). If the last checkpoint is not available, the job upgrade will fail. + +:::info +`last-state` or `savepoint` are recommended modes for production +::: + + +Deploying the Solution}> + +In this [example](https://github.com/awslabs/data-on-eks/tree/main/streaming/flink), you will provision the following resources required to run Flink Jobs with Flink Operator and Apache YuniKorn. + +This example deploys an EKS Cluster running the Flink Operator into a new VPC. + +- Creates a new sample VPC, 2 Private Subnets and 2 Public Subnets +- Creates Internet gateway for Public Subnets and NAT Gateway for Private Subnets +- Creates EKS Cluster Control plane with public endpoint (for demo reasons only) with core managed node group, on-demand node group and Spot node group for Flink workloads +- Deploys Metrics server, Cluster Autoscaler, Apache YuniKorn, Karpenter, Grafana, AMP and Prometheus server +- Deploys Cert Manager and EMR Flink Operator. Flink Operator has dependency on Cert Manager +- Creates a new Flink Data team resources that includes namespace, IRSA, Role and Role binding +- Deploys Karpenter provisioner for flink-compute-optimized types + +### Prerequisites + +Ensure that you have installed the following tools on your machine. + +1. [aws cli](https://docs.aws.amazon.com/cli/latest/userguide/install-cliv2.html) +2. [kubectl](https://Kubernetes.io/docs/tasks/tools/) +3. [terraform](https://learn.hashicorp.com/tutorials/terraform/install-cli) + +### Deploy + +Clone the repository. + +```bash +git clone https://github.com/awslabs/data-on-eks.git +``` + +Navigate into Flink's Terraform template directory and run `install.sh` script. + +```bash +cd data-on-eks/streaming/emr-flink-eks +chmod +x install.sh +./install.sh +``` +Verify the cluster status + +```bash + ➜ kubectl get nodes -A + NAME STATUS ROLES AGE VERSION + ip-10-1-160-150.us-west-2.compute.internal Ready 24h v1.24.11-eks-a59e1f0 + ip-10-1-169-249.us-west-2.compute.internal Ready 6d v1.24.11-eks-a59e1f0 + ip-10-1-69-244.us-west-2.compute.internal Ready 6d v1.24.11-eks-a59e1f0 + + ➜ ~ kubectl get pods -n flink-kubernetes-operator-ns + NAME READY STATUS RESTARTS AGE + flink-kubernetes-operator-555776785f-pzx8p 2/2 Running 0 4h21m + flink-kubernetes-operator-555776785f-z5jpt 2/2 Running 0 4h18m + + ➜ ~ kubectl get pods -n cert-manager + NAME READY STATUS RESTARTS AGE + cert-manager-77fc7548dc-dzdms 1/1 Running 0 24h + cert-manager-cainjector-8869b7ff7-4w754 1/1 Running 0 24h + cert-manager-webhook-586ddf8589-g6s87 1/1 Running 0 24h +``` + +To list all the resources created for Flink team to run Flink jobs using this namespace + +```bash + ➜ ~ kubectl get all,role,rolebinding,serviceaccount --namespace flink-team-a-ns + NAME CREATED AT + role.rbac.authorization.k8s.io/flink-team-a-role 2023-04-06T13:17:05Z + + NAME ROLE AGE + rolebinding.rbac.authorization.k8s.io/flink-team-a-role-binding Role/flink-team-a-role 22h + + NAME SECRETS AGE + serviceaccount/default 0 22h + serviceaccount/flink-team-a-sa 0 22h +``` + + + + +Execute Sample Flink job with Karpenter}> + +Navigate to example directory and submit the Flink job. + +```bash +cd data-on-eks/streaming/emr-eks-flink/examples/karpenter +``` +Get the role arn linked to the job execution service account. +```bash +terraform output flink_job_execution_role_arn +``` +Get the S3 bucket name for checkpoint,savepoint,logs and job storage data. +```bash +terraform output flink_operator_bucket +``` + +Open the basic-example-app-cluster.yaml in any editor and replace the place holder for **JOB_EXECUTION_ROLE_ARN** with the flink_job_execution_role_arn terraform output command. Replace the **ENTER_S3_BUCKET** placeholder with the flink_operator_bucket output. + +Deploy the job by running the kubectl deploy command. + +```bash +kubectl apply -f basic-example-app-cluster.yaml +``` + +Monitor the job status using the below command. +You should see the new nodes triggered by the karpenter and the YuniKorn will schedule one Job manager pod and one Taskmanager pods on this node. + +```bash +kubectl get deployments -n flink-team-a-ns +NAME READY UP-TO-DATE AVAILABLE AGE +basic-example-app-cluster-flink 2/2 2 2 3h6m + +kubectl get pods -n flink-team-a-ns +NAME READY STATUS RESTARTS AGE +basic-example-app-cluster-flink-7c7d9c6fd9-cdfmd 2/2 Running 0 3h7m +basic-example-app-cluster-flink-7c7d9c6fd9-pjxj2 2/2 Running 0 3h7m +basic-example-app-cluster-flink-taskmanager-1-1 2/2 Running 0 3h6m + +kubectl get services -n flink-team-a-ns +NAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) AGE +basic-example-app-cluster-flink-rest ClusterIP 172.20.17.152 8081/TCP 3h7m +``` + +To access the Flink WebUI for the job run this command locally. + +```bash +kubectl port-forward svc/basic-example-app-cluster-flink-rest 8081 -n flink-team-a-ns +``` + +![Flink Job UI](img/flink1.png) +![Flink Job UI](img/flink2.png) +![Flink Job UI](img/flink3.png) +![Flink Job UI](img/flink4.png) +![Flink Job UI](img/flink5.png) + + + + +Cleanup}> + +This script will cleanup the environment using `-target` option to ensure all the resources are deleted in correct order. + +```bash +cd .. && chmod +x cleanup.sh +./cleanup.sh +``` + + + +:::caution +To avoid unwanted charges to your AWS account, delete all the AWS resources created during this deployment +:::