Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add EMR runtime for flink operator blueprint #485

Merged
merged 25 commits into from
Apr 25, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
92 changes: 86 additions & 6 deletions streaming/emr-eks-flink/README.md
Original file line number Diff line number Diff line change
@@ -1,11 +1,85 @@
# EMR-EKS-Flink Blueprint

Checkout the [documentation website](https://awslabs.github.io/data-on-eks/docs/blueprints/streaming/emr-eks-flink) to deploy this pattern and run sample tests.

<!-- BEGINNING OF PRE-COMMIT-TERRAFORM DOCS HOOK -->
## Requirements

| Name | Version |
|------|---------|
| <a name="requirement_terraform"></a> [terraform](#requirement\_terraform) | >= 1.0.0 |
| <a name="requirement_aws"></a> [aws](#requirement\_aws) | >= 3.72 |
| <a name="requirement_helm"></a> [helm](#requirement\_helm) | >= 2.13.0 |
| <a name="requirement_kubectl"></a> [kubectl](#requirement\_kubectl) | >= 1.14 |
| <a name="requirement_kubernetes"></a> [kubernetes](#requirement\_kubernetes) | >= 2.10 |
| <a name="requirement_random"></a> [random](#requirement\_random) | 3.3.2 |

## Providers

| Name | Version |
|------|---------|
| <a name="provider_aws"></a> [aws](#provider\_aws) | >= 3.72 |
| <a name="provider_aws.ecr"></a> [aws.ecr](#provider\_aws.ecr) | >= 3.72 |
| <a name="provider_kubernetes"></a> [kubernetes](#provider\_kubernetes) | >= 2.10 |

## Modules

| Name | Source | Version |
|------|--------|---------|
| <a name="module_ebs_csi_driver_irsa"></a> [ebs\_csi\_driver\_irsa](#module\_ebs\_csi\_driver\_irsa) | terraform-aws-modules/iam/aws//modules/iam-role-for-service-accounts-eks | ~> 5.20 |
| <a name="module_eks"></a> [eks](#module\_eks) | terraform-aws-modules/eks/aws | ~> 19.15 |
| <a name="module_eks_blueprints_addons"></a> [eks\_blueprints\_addons](#module\_eks\_blueprints\_addons) | aws-ia/eks-blueprints-addons/aws | ~> 1.2 |
| <a name="module_eks_data_addons"></a> [eks\_data\_addons](#module\_eks\_data\_addons) | aws-ia/eks-data-addons/aws | ~> 1.30 |
| <a name="module_flink_irsa_jobs"></a> [flink\_irsa\_jobs](#module\_flink\_irsa\_jobs) | aws-ia/eks-blueprints-addon/aws | ~> 1.0 |
| <a name="module_flink_irsa_operator"></a> [flink\_irsa\_operator](#module\_flink\_irsa\_operator) | aws-ia/eks-blueprints-addon/aws | ~> 1.0 |
| <a name="module_s3_bucket"></a> [s3\_bucket](#module\_s3\_bucket) | terraform-aws-modules/s3-bucket/aws | ~> 3.0 |
| <a name="module_vpc"></a> [vpc](#module\_vpc) | terraform-aws-modules/vpc/aws | 5.5.1 |

## Resources

| Name | Type |
|------|------|
| [aws_cloudwatch_log_group.flink_team_a](https://registry.terraform.io/providers/hashicorp/aws/latest/docs/resources/cloudwatch_log_group) | resource |
| [aws_iam_policy.flink](https://registry.terraform.io/providers/hashicorp/aws/latest/docs/resources/iam_policy) | resource |
| [aws_s3_object.checkpoints](https://registry.terraform.io/providers/hashicorp/aws/latest/docs/resources/s3_object) | resource |
| [aws_s3_object.jobmanager](https://registry.terraform.io/providers/hashicorp/aws/latest/docs/resources/s3_object) | resource |
| [aws_s3_object.logs](https://registry.terraform.io/providers/hashicorp/aws/latest/docs/resources/s3_object) | resource |
| [aws_s3_object.savepoints](https://registry.terraform.io/providers/hashicorp/aws/latest/docs/resources/s3_object) | resource |
| [kubernetes_namespace_v1.flink_team_a](https://registry.terraform.io/providers/hashicorp/kubernetes/latest/docs/resources/namespace_v1) | resource |
| [aws_ami.x86](https://registry.terraform.io/providers/hashicorp/aws/latest/docs/data-sources/ami) | data source |
| [aws_availability_zones.available](https://registry.terraform.io/providers/hashicorp/aws/latest/docs/data-sources/availability_zones) | data source |
| [aws_caller_identity.current](https://registry.terraform.io/providers/hashicorp/aws/latest/docs/data-sources/caller_identity) | data source |
| [aws_ecrpublic_authorization_token.token](https://registry.terraform.io/providers/hashicorp/aws/latest/docs/data-sources/ecrpublic_authorization_token) | data source |
| [aws_eks_cluster_auth.this](https://registry.terraform.io/providers/hashicorp/aws/latest/docs/data-sources/eks_cluster_auth) | data source |
| [aws_iam_policy_document.flink_sample_job](https://registry.terraform.io/providers/hashicorp/aws/latest/docs/data-sources/iam_policy_document) | data source |

## Inputs

| Name | Description | Type | Default | Required |
|------|-------------|------|---------|:--------:|
| <a name="input_eks_cluster_version"></a> [eks\_cluster\_version](#input\_eks\_cluster\_version) | EKS version for the cluster | `string` | `"1.28"` | no |
| <a name="input_name"></a> [name](#input\_name) | Name of the VPC and EKS Cluster | `string` | `"emr-eks-flink"` | no |
| <a name="input_region"></a> [region](#input\_region) | Region for deployment | `string` | `"us-west-2"` | no |

## Outputs

| Name | Description |
|------|-------------|
| <a name="output_flink_checkpoint_path"></a> [flink\_checkpoint\_path](#output\_flink\_checkpoint\_path) | S3 path for checkpoint data |
| <a name="output_flink_jobmanager_path"></a> [flink\_jobmanager\_path](#output\_flink\_jobmanager\_path) | S3 path for jobmanager data |
| <a name="output_flink_jobs_role_arn"></a> [flink\_jobs\_role\_arn](#output\_flink\_jobs\_role\_arn) | IAM linked role for the flink job |
| <a name="output_flink_logs_path"></a> [flink\_logs\_path](#output\_flink\_logs\_path) | S3 path for logs |
| <a name="output_flink_operator_role_arn"></a> [flink\_operator\_role\_arn](#output\_flink\_operator\_role\_arn) | IAM linked role for the flink operator |
| <a name="output_flink_savepoint_path"></a> [flink\_savepoint\_path](#output\_flink\_savepoint\_path) | S3 path for savepoint data |
<!-- END OF PRE-COMMIT-TERRAFORM DOCS HOOK -->
<!-- BEGIN_TF_DOCS -->
## Requirements

| Name | Version |
|------|---------|
| <a name="requirement_terraform"></a> [terraform](#requirement\_terraform) | >= 1.0.0 |
| <a name="requirement_aws"></a> [aws](#requirement\_aws) | >= 3.72 |
| <a name="requirement_helm"></a> [helm](#requirement\_helm) | >= 2.4.1 |
| <a name="requirement_helm"></a> [helm](#requirement\_helm) | >= 2.13.0 |
| <a name="requirement_kubectl"></a> [kubectl](#requirement\_kubectl) | >= 1.14 |
| <a name="requirement_kubernetes"></a> [kubernetes](#requirement\_kubernetes) | >= 2.10 |
| <a name="requirement_random"></a> [random](#requirement\_random) | 3.3.2 |
Expand All @@ -14,9 +88,8 @@

| Name | Version |
|------|---------|
| <a name="provider_aws"></a> [aws](#provider\_aws) | 5.45.0 |
| <a name="provider_aws.ecr"></a> [aws.ecr](#provider\_aws.ecr) | 5.45.0 |
| <a name="provider_helm"></a> [helm](#provider\_helm) | 2.13.0 |
| <a name="provider_aws"></a> [aws](#provider\_aws) | 5.46.0 |
| <a name="provider_aws.ecr"></a> [aws.ecr](#provider\_aws.ecr) | 5.46.0 |
| <a name="provider_kubernetes"></a> [kubernetes](#provider\_kubernetes) | 2.29.0 |

## Modules
Expand All @@ -38,14 +111,17 @@
|------|------|
| [aws_cloudwatch_log_group.flink_team_a](https://registry.terraform.io/providers/hashicorp/aws/latest/docs/resources/cloudwatch_log_group) | resource |
| [aws_iam_policy.flink](https://registry.terraform.io/providers/hashicorp/aws/latest/docs/resources/iam_policy) | resource |
| [helm_release.flink_kubernetes_operator](https://registry.terraform.io/providers/hashicorp/helm/latest/docs/resources/release) | resource |
| [aws_s3_object.checkpoints](https://registry.terraform.io/providers/hashicorp/aws/latest/docs/resources/s3_object) | resource |
| [aws_s3_object.jobmanager](https://registry.terraform.io/providers/hashicorp/aws/latest/docs/resources/s3_object) | resource |
| [aws_s3_object.logs](https://registry.terraform.io/providers/hashicorp/aws/latest/docs/resources/s3_object) | resource |
| [aws_s3_object.savepoints](https://registry.terraform.io/providers/hashicorp/aws/latest/docs/resources/s3_object) | resource |
| [kubernetes_namespace_v1.flink_team_a](https://registry.terraform.io/providers/hashicorp/kubernetes/latest/docs/resources/namespace_v1) | resource |
| [aws_ami.x86](https://registry.terraform.io/providers/hashicorp/aws/latest/docs/data-sources/ami) | data source |
| [aws_availability_zones.available](https://registry.terraform.io/providers/hashicorp/aws/latest/docs/data-sources/availability_zones) | data source |
| [aws_caller_identity.current](https://registry.terraform.io/providers/hashicorp/aws/latest/docs/data-sources/caller_identity) | data source |
| [aws_ecrpublic_authorization_token.token](https://registry.terraform.io/providers/hashicorp/aws/latest/docs/data-sources/ecrpublic_authorization_token) | data source |
| [aws_eks_cluster_auth.this](https://registry.terraform.io/providers/hashicorp/aws/latest/docs/data-sources/eks_cluster_auth) | data source |
| [aws_iam_policy_document.flink_operator](https://registry.terraform.io/providers/hashicorp/aws/latest/docs/data-sources/iam_policy_document) | data source |
| [aws_iam_policy_document.flink_sample_job](https://registry.terraform.io/providers/hashicorp/aws/latest/docs/data-sources/iam_policy_document) | data source |

## Inputs

Expand All @@ -59,6 +135,10 @@

| Name | Description |
|------|-------------|
| <a name="output_flink_checkpoint_path"></a> [flink\_checkpoint\_path](#output\_flink\_checkpoint\_path) | S3 path for checkpoint data |
| <a name="output_flink_jobmanager_path"></a> [flink\_jobmanager\_path](#output\_flink\_jobmanager\_path) | S3 path for jobmanager data |
| <a name="output_flink_jobs_role_arn"></a> [flink\_jobs\_role\_arn](#output\_flink\_jobs\_role\_arn) | IAM linked role for the flink job |
| <a name="output_flink_logs_path"></a> [flink\_logs\_path](#output\_flink\_logs\_path) | S3 path for logs |
| <a name="output_flink_operator_role_arn"></a> [flink\_operator\_role\_arn](#output\_flink\_operator\_role\_arn) | IAM linked role for the flink operator |
| <a name="output_flink_savepoint_path"></a> [flink\_savepoint\_path](#output\_flink\_savepoint\_path) | S3 path for savepoint data |
<!-- END_TF_DOCS -->
95 changes: 89 additions & 6 deletions streaming/emr-eks-flink/addons.tf
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,9 @@ module "s3_bucket" {
source = "terraform-aws-modules/s3-bucket/aws"
version = "~> 3.0"

bucket_prefix = "${local.name}-flink-logs-"
bucket_prefix = "${local.name}-"

# For example only - please evaluate for your environment
force_destroy = true

attach_deny_insecure_transport_policy = true
attach_require_latest_tls_policy = true
attach_require_latest_tls_policy = true

block_public_acls = true
block_public_policy = true
Expand Down Expand Up @@ -143,11 +139,27 @@ module "eks_blueprints_addons" {
# Data on EKS Kubernetes Addons
#---------------------------------------------------------------
module "eks_data_addons" {

depends_on = [module.flink_irsa_jobs, module.flink_irsa_operator]
#source = "[email protected]:mithun008/terraform-aws-eks-data-addons/"

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please remove the comment.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

source = "aws-ia/eks-data-addons/aws"
version = "~> 1.30" # ensure to update this to the latest/desired version

oidc_provider_arn = module.eks.oidc_provider_arn
enable_karpenter_resources = true
enable_emr_flink_operator = true
emr_flink_operator_helm_config = {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

replace this bloc with

  emr_flink_operator_helm_config = {
    repository               = "oci://public.ecr.aws/emr-on-eks"
    operatorExecutionRoleArn = module.flink_irsa_operator.iam_role_arn
  }
  

You can remove the repository and operatorExecutionRoleArn once you update these in Data addons repo

namespace = "emr-flink-operator"
create_namespace = true
namespace = "${local.flink_operator}-ns"
name = "flink-kubernetes-operator"
repository = "oci://public.ecr.aws/emr-on-eks"
chart = "flink-kubernetes-operator"
operatorExecutionRoleArn = module.flink_irsa_operator.iam_role_arn


}

karpenter_resources_helm_config = {
flink-compute-optimized = {
values = [
Expand Down Expand Up @@ -207,6 +219,77 @@ module "eks_data_addons" {
EOT
]
}
flink-graviton-memory-optimized = {
values = [
<<-EOT
name: flink-graviton-memory-optimized
clusterName: ${module.eks.cluster_name}
ec2NodeClass:
karpenterRole: ${split("/", module.eks_blueprints_addons.karpenter.node_iam_role_arn)[1]}
subnetSelectorTerms:
tags:
Name: "${module.eks.cluster_name}-private*"
securityGroupSelectorTerms:
tags:
Name: ${module.eks.cluster_name}-node
instanceStorePolicy: RAID0
nodePool:
labels:
- type: karpenter
- NodeGroupType: FlinkGravitonMemoryOptimized
- multiArch: Flink
requirements:
- key: "karpenter.sh/capacity-type"
operator: In
values: ["spot", "on-demand"]
- key: "kubernetes.io/arch"
operator: In
values: ["arm64"]
- key: "karpenter.k8s.aws/instance-category"
operator: In
values: ["r"]
- key: "karpenter.k8s.aws/instance-family"
operator: In
values: ["r6gd"]
- key: "karpenter.k8s.aws/instance-cpu"
operator: In
values: ["4", "8", "16", "32"]
- key: "karpenter.k8s.aws/instance-hypervisor"
operator: In
values: ["nitro"]
- key: "karpenter.k8s.aws/instance-generation"
operator: Gt
values: ["2"]
limits:
cpu: 1000
disruption:
consolidationPolicy: WhenEmpty
consolidateAfter: 30s
expireAfter: 720h
weight: 50
EOT
]
}
}
}

resource "aws_s3_object" "checkpoints" {
bucket = module.s3_bucket.s3_bucket_id
key = "checkpoints/"
content_type = "application/x-directory"
}
resource "aws_s3_object" "savepoints" {
bucket = module.s3_bucket.s3_bucket_id
key = "savepoints/"
content_type = "application/x-directory"
}
resource "aws_s3_object" "jobmanager" {
bucket = module.s3_bucket.s3_bucket_id
key = "jobmanager/"
content_type = "application/x-directory"
}
resource "aws_s3_object" "logs" {
bucket = module.s3_bucket.s3_bucket_id
key = "logs/"
content_type = "application/x-directory"
}
64 changes: 0 additions & 64 deletions streaming/emr-eks-flink/data.tf

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ metadata:
namespace: flink-team-a-ns
spec:
imagePullPolicy: Always
emrReleaseLabel: "emr-7.0.0-flink-latest"
emrReleaseLabel: "emr-7.1.0-flink-latest"
flinkVersion: v1_18
flinkConfiguration:
taskmanager.numberOfTaskSlots: "2"
Expand All @@ -26,7 +26,7 @@ spec:
state.checkpoints.dir: s3://emr-flink-data/checkpoints
state.savepoints.dir: s3://emr-flink-data/savepoints


# Replace this execution role ARN with your own
executionRoleArn: arn:aws:iam::xxxxxxxxx:role/emr-eks-flink-flink-team-a-20240406012025932700000008
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to provide instruction to replace this execution role ARN on top of the file as a NOTE

jobManager:
# Replace with s3 bucket in your own account
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ metadata:
namespace: flink-team-a-ns
spec:
imagePullPolicy: Always
emrReleaseLabel: "emr-7.0.0-flink-latest"
emrReleaseLabel: "emr-7.1.0-flink-latest"
flinkVersion: v1_18
flinkConfiguration:
taskmanager.numberOfTaskSlots: "2"
Expand All @@ -26,8 +26,8 @@ spec:
state.checkpoints.dir: s3://emr-flink-data/checkpoints
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

<<REPLACE_WITH_S3_BUCKET_ID>>

state.savepoints.dir: s3://emr-flink-data/savepoints


executionRoleArn: arn:aws:iam::xxxxxxxxx:role/emr-eks-flink-flink-team-a-20240406012025932700000008
# Replace this execution role ARN with your own
executionRoleArn: arn:aws:iam::681921237057:role/emr-eks-flink-flink-team-a-20240406012025932700000008

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should substitute this with variables, or provide instructions on how to generate the role arn (including the permissions).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have the instructions in the website docs to substitute these values.


podTemplate:
apiVersion: v1
Expand Down
Loading
Loading