From 21a5c4bf92c719922f7861ee464a67c799ea246d Mon Sep 17 00:00:00 2001 From: lvthillo Date: Mon, 25 Nov 2024 22:53:36 +0100 Subject: [PATCH 01/10] feat: add provisioned_poller_config for kafka in lambda event source mapping --- .../service/lambda/event_source_mapping.go | 69 +++++++++++++++++++ .../lambda/event_source_mapping_test.go | 19 +++++ .../lambda_event_source_mapping.html.markdown | 11 +++ 3 files changed, 99 insertions(+) diff --git a/internal/service/lambda/event_source_mapping.go b/internal/service/lambda/event_source_mapping.go index bcbb79a40c5..d97c8576f9e 100644 --- a/internal/service/lambda/event_source_mapping.go +++ b/internal/service/lambda/event_source_mapping.go @@ -244,6 +244,28 @@ func resourceEventSourceMapping() *schema.Resource { Computed: true, ValidateFunc: validation.IntBetween(1, 10), }, + "provisioned_poller_config": { + Type: schema.TypeList, + Optional: true, + Computed: true, + MaxItems: 1, + Elem: &schema.Resource{ + Schema: map[string]*schema.Schema{ + "maximum_pollers": { + Type: schema.TypeInt, + Optional: true, + Computed: true, + ValidateFunc: validation.IntBetween(1, 2000), + }, + "minimum_pollers": { + Type: schema.TypeInt, + Optional: true, + Computed: true, + ValidateFunc: validation.IntBetween(1, 200), + }, + }, + }, + }, "queues": { Type: schema.TypeList, Optional: true, @@ -447,6 +469,10 @@ func resourceEventSourceMappingCreate(ctx context.Context, d *schema.ResourceDat input.MaximumRetryAttempts = aws.Int32(int32(v.(int))) } + if v, ok := d.GetOk("provisioned_poller_config"); ok && len(v.([]interface{})) > 0 && v.([]interface{})[0] != nil { + input.ProvisionedPollerConfig = expandProvisionedPollerConfig(v.([]interface{})[0].(map[string]interface{})) + } + if v, ok := d.GetOk("parallelization_factor"); ok { input.ParallelizationFactor = aws.Int32(int32(v.(int))) } @@ -584,6 +610,13 @@ func resourceEventSourceMappingRead(ctx context.Context, d *schema.ResourceData, d.Set("maximum_batching_window_in_seconds", output.MaximumBatchingWindowInSeconds) d.Set("maximum_record_age_in_seconds", output.MaximumRecordAgeInSeconds) d.Set("maximum_retry_attempts", output.MaximumRetryAttempts) + if v := output.ProvisionedPollerConfig; v != nil { + if err := d.Set("provisioned_poller_config", []interface{}{flattenProvisionedPollerConfig(v)}); err != nil { + return sdkdiag.AppendErrorf(diags, "setting provisioned_poller_config: %s", err) + } + } else { + d.Set("provisioned_poller_config", nil) + } d.Set("parallelization_factor", output.ParallelizationFactor) d.Set("queues", output.Queues) if v := output.ScalingConfig; v != nil { @@ -1107,6 +1140,42 @@ func flattenSelfManagedKafkaEventSourceConfig(apiObject *awstypes.SelfManagedKaf return tfMap } +func expandProvisionedPollerConfig(tfMap map[string]interface{}) *awstypes.ProvisionedPollerConfig { + if tfMap == nil { + return nil + } + + apiObject := &awstypes.ProvisionedPollerConfig{} + + if v, ok := tfMap["maximum_pollers"].(int); ok && v != 0 { + apiObject.MaximumPollers = aws.Int32(int32(v)) + } + + if v, ok := tfMap["minimum_pollers"].(int); ok && v != 0 { + apiObject.MinimumPollers = aws.Int32(int32(v)) + } + + return apiObject +} + +func flattenProvisionedPollerConfig(apiObject *awstypes.ProvisionedPollerConfig) map[string]interface{} { + if apiObject == nil { + return nil + } + + tfMap := map[string]interface{}{} + + if v := apiObject.MaximumPollers; v != nil { + tfMap["maximum_pollers"] = aws.ToInt32(v) + } + + if v := apiObject.MinimumPollers; v != nil { + tfMap["minimum_pollers"] = aws.ToInt32(v) + } + + return tfMap +} + func expandSourceAccessConfiguration(tfMap map[string]interface{}) *awstypes.SourceAccessConfiguration { if tfMap == nil { return nil diff --git a/internal/service/lambda/event_source_mapping_test.go b/internal/service/lambda/event_source_mapping_test.go index 3829b2c2d08..8fc246fb745 100644 --- a/internal/service/lambda/event_source_mapping_test.go +++ b/internal/service/lambda/event_source_mapping_test.go @@ -881,6 +881,8 @@ func TestAccLambdaEventSourceMapping_msk(t *testing.T) { resource.TestCheckResourceAttrPair(resourceName, "event_source_arn", eventSourceResourceName, names.AttrARN), acctest.CheckResourceAttrRFC3339(resourceName, "last_modified"), resource.TestCheckResourceAttr(resourceName, "amazon_managed_kafka_event_source_config.#", "1"), + resource.TestCheckResourceAttr(resourceName, "provisioned_poller_config.#", "1"), + resource.TestCheckResourceAttr(resourceName, "provisioned_poller_config.0.maximum_pollers", "60"), resource.TestCheckResourceAttr(resourceName, "topics.#", "1"), resource.TestCheckTypeSetElemAttr(resourceName, "topics.*", "test"), ), @@ -906,6 +908,8 @@ func TestAccLambdaEventSourceMapping_msk(t *testing.T) { resource.TestCheckResourceAttrPair(resourceName, "event_source_arn", eventSourceResourceName, names.AttrARN), acctest.CheckResourceAttrRFC3339(resourceName, "last_modified"), resource.TestCheckResourceAttr(resourceName, "amazon_managed_kafka_event_source_config.#", "1"), + resource.TestCheckResourceAttr(resourceName, "provisioned_poller_config.#", "1"), + resource.TestCheckResourceAttr(resourceName, "provisioned_poller_config.0.maximum_pollers", "60"), resource.TestCheckResourceAttr(resourceName, "topics.#", "1"), resource.TestCheckTypeSetElemAttr(resourceName, "topics.*", "test"), ), @@ -940,6 +944,9 @@ func TestAccLambdaEventSourceMapping_mskWithEventSourceConfig(t *testing.T) { acctest.CheckResourceAttrRFC3339(resourceName, "last_modified"), resource.TestCheckResourceAttr(resourceName, "amazon_managed_kafka_event_source_config.#", "1"), resource.TestCheckResourceAttr(resourceName, "amazon_managed_kafka_event_source_config.0.consumer_group_id", "amazon-managed-test-group-id"), + resource.TestCheckResourceAttr(resourceName, "provisioned_poller_config.#", "1"), + resource.TestCheckResourceAttr(resourceName, "provisioned_poller_config.0.maximum_pollers", "80"), + resource.TestCheckResourceAttr(resourceName, "provisioned_poller_config.0.minimum_pollers", "10"), resource.TestCheckResourceAttr(resourceName, "topics.#", "1"), resource.TestCheckTypeSetElemAttr(resourceName, "topics.*", "test"), ), @@ -1022,6 +1029,9 @@ func TestAccLambdaEventSourceMapping_selfManagedKafkaWithEventSourceConfig(t *te resource.TestCheckResourceAttr(resourceName, "self_managed_kafka_event_source_config.#", "1"), resource.TestCheckResourceAttr(resourceName, "self_managed_kafka_event_source_config.0.consumer_group_id", "self-managed-test-group-id"), resource.TestCheckResourceAttr(resourceName, "source_access_configuration.#", "3"), + resource.TestCheckResourceAttr(resourceName, "provisioned_poller_config.#", "1"), + resource.TestCheckResourceAttr(resourceName, "provisioned_poller_config.0.maximum_pollers", "80"), + resource.TestCheckResourceAttr(resourceName, "provisioned_poller_config.0.minimum_pollers", "10"), acctest.CheckResourceAttrRFC3339(resourceName, "last_modified"), resource.TestCheckResourceAttr(resourceName, "topics.#", "1"), resource.TestCheckTypeSetElemAttr(resourceName, "topics.*", "test"), @@ -2367,6 +2377,10 @@ resource "aws_lambda_event_source_mapping" "test" { topics = ["test"] starting_position = "TRIM_HORIZON" + provisioned_poller_config { + maximum_pollers = 60 + } + depends_on = [aws_iam_policy_attachment.test] } `, rName, batchSize)) @@ -2408,6 +2422,11 @@ resource "aws_lambda_event_source_mapping" "test" { consumer_group_id = "amazon-managed-test-group-id" } + provisioned_poller_config { + maximum_pollers = 80 + minimum_pollers = 10 + } + depends_on = [aws_iam_policy_attachment.test] } `, rName, batchSize)) diff --git a/website/docs/r/lambda_event_source_mapping.html.markdown b/website/docs/r/lambda_event_source_mapping.html.markdown index e1191136cae..038cee22c16 100644 --- a/website/docs/r/lambda_event_source_mapping.html.markdown +++ b/website/docs/r/lambda_event_source_mapping.html.markdown @@ -58,6 +58,11 @@ resource "aws_lambda_event_source_mapping" "example" { topics = ["Example"] starting_position = "TRIM_HORIZON" + provisioned_poller_config { + maximum_poller = 80 + minimum_poller = 10 + } + self_managed_event_source { endpoints = { KAFKA_BOOTSTRAP_SERVERS = "kafka1.example.com:9092,kafka2.example.com:9092" @@ -166,6 +171,7 @@ resource "aws_lambda_event_source_mapping" "example" { * `maximum_record_age_in_seconds`: - (Optional) The maximum age of a record that Lambda sends to a function for processing. Only available for stream sources (DynamoDB and Kinesis). Must be either -1 (forever, and the default value) or between 60 and 604800 (inclusive). * `maximum_retry_attempts`: - (Optional) The maximum number of times to retry when the function returns an error. Only available for stream sources (DynamoDB and Kinesis). Minimum and default of -1 (forever), maximum of 10000. * `parallelization_factor`: - (Optional) The number of batches to process from each shard concurrently. Only available for stream sources (DynamoDB and Kinesis). Minimum and default of 1, maximum of 10. +* `provisioned_poller_config`: - (Optional) Amazon MSK and self-managed Apache Kafka only, the provisioned mode configuration for the event source. * `queues` - (Optional) The name of the Amazon MQ broker destination queue to consume. Only available for MQ sources. The list must contain exactly one queue name. * `scaling_config` - (Optional) Scaling configuration of the event source. Only available for SQS queues. Detailed below. * `self_managed_event_source`: - (Optional) For Self Managed Kafka sources, the location of the self managed cluster. If set, configuration must also include `source_access_configuration`. Detailed below. @@ -203,6 +209,11 @@ resource "aws_lambda_event_source_mapping" "example" { * `pattern` - (Optional) A filter pattern up to 4096 characters. See [Filter Rule Syntax](https://docs.aws.amazon.com/lambda/latest/dg/invocation-eventfiltering.html#filtering-syntax). +### provisioned_poller_config Configuration Block + +* `maximum_pollers` - (Optional) The maximum number of event pollers this event source can scale up to. The range is between 1 and 2000. +* `minimum_pollers` - (Optional) The minimum number of event pollers this event source can scale down to. The range is between 1 and 200. + ### scaling_config Configuration Block * `maximum_concurrency` - (Optional) Limits the number of concurrent instances that the Amazon SQS event source can invoke. Must be greater than or equal to `2`. See [Configuring maximum concurrency for Amazon SQS event sources](https://docs.aws.amazon.com/lambda/latest/dg/with-sqs.html#events-sqs-max-concurrency). You need to raise a [Service Quota Ticket](https://docs.aws.amazon.com/general/latest/gr/aws_service_limits.html) to increase the concurrency beyond 1000. From e9b9008bb0c9ccaf9afd94de0d34c691aee7edbf Mon Sep 17 00:00:00 2001 From: lvthillo Date: Mon, 25 Nov 2024 22:58:40 +0100 Subject: [PATCH 02/10] feat: add changelog --- .changelog/40303.txt | 3 +++ 1 file changed, 3 insertions(+) create mode 100644 .changelog/40303.txt diff --git a/.changelog/40303.txt b/.changelog/40303.txt new file mode 100644 index 00000000000..3109098ca14 --- /dev/null +++ b/.changelog/40303.txt @@ -0,0 +1,3 @@ +```release-note:enhancement +resource/aws_lambda_event_source_mapping: Add `provisioned_poller_config` argument to `aws_lambda_event_source_mapping` for MSK. +``` \ No newline at end of file From a0251899ef5d1c63f912b26cb16aa0d243688aa4 Mon Sep 17 00:00:00 2001 From: lvthillo Date: Mon, 25 Nov 2024 23:00:58 +0100 Subject: [PATCH 03/10] fix: fix formatting in doc --- website/docs/r/lambda_event_source_mapping.html.markdown | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/website/docs/r/lambda_event_source_mapping.html.markdown b/website/docs/r/lambda_event_source_mapping.html.markdown index 038cee22c16..a4816a47689 100644 --- a/website/docs/r/lambda_event_source_mapping.html.markdown +++ b/website/docs/r/lambda_event_source_mapping.html.markdown @@ -59,8 +59,8 @@ resource "aws_lambda_event_source_mapping" "example" { starting_position = "TRIM_HORIZON" provisioned_poller_config { - maximum_poller = 80 - minimum_poller = 10 + maximum_poller = 80 + minimum_poller = 10 } self_managed_event_source { From 8787d3ab4192cb8dfe8e690c22ff80ab2c6a01c3 Mon Sep 17 00:00:00 2001 From: lvthillo Date: Tue, 26 Nov 2024 07:43:30 +0100 Subject: [PATCH 04/10] fix: fix acctest --- internal/service/lambda/event_source_mapping_test.go | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/internal/service/lambda/event_source_mapping_test.go b/internal/service/lambda/event_source_mapping_test.go index 8fc246fb745..4cd9435c7a3 100644 --- a/internal/service/lambda/event_source_mapping_test.go +++ b/internal/service/lambda/event_source_mapping_test.go @@ -2463,6 +2463,7 @@ resource "aws_lambda_event_source_mapping" "test" { type = "VPC_SECURITY_GROUP" uri = aws_security_group.test.id } + } `, rName, batchSize, kafkaBootstrapServers)) } @@ -2502,6 +2503,11 @@ resource "aws_lambda_event_source_mapping" "test" { type = "VPC_SECURITY_GROUP" uri = aws_security_group.test.id } + + provisioned_poller_config { + maximum_pollers = 80 + minimum_pollers = 10 + } } `, rName, batchSize, kafkaBootstrapServers)) } From 51c8c4d61259ed85e5290ea0c0be70b992a62adc Mon Sep 17 00:00:00 2001 From: Kit Ewbank Date: Wed, 27 Nov 2024 09:25:20 -0500 Subject: [PATCH 05/10] Tweak CHANGELOG entry. --- .changelog/40303.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.changelog/40303.txt b/.changelog/40303.txt index 3109098ca14..3490cc62ef5 100644 --- a/.changelog/40303.txt +++ b/.changelog/40303.txt @@ -1,3 +1,3 @@ ```release-note:enhancement -resource/aws_lambda_event_source_mapping: Add `provisioned_poller_config` argument to `aws_lambda_event_source_mapping` for MSK. +resource/aws_lambda_event_source_mapping: Add `provisioned_poller_config` argument ``` \ No newline at end of file From 868da5df727959ae5a6fe5ac05f24c83f5326299 Mon Sep 17 00:00:00 2001 From: Kit Ewbank Date: Wed, 27 Nov 2024 11:41:26 -0500 Subject: [PATCH 06/10] Tidy up. --- .../service/lambda/event_source_mapping.go | 10 +- .../lambda/event_source_mapping_test.go | 130 ++++++++++++++---- 2 files changed, 110 insertions(+), 30 deletions(-) diff --git a/internal/service/lambda/event_source_mapping.go b/internal/service/lambda/event_source_mapping.go index d97c8576f9e..2247f2ea892 100644 --- a/internal/service/lambda/event_source_mapping.go +++ b/internal/service/lambda/event_source_mapping.go @@ -247,7 +247,6 @@ func resourceEventSourceMapping() *schema.Resource { "provisioned_poller_config": { Type: schema.TypeList, Optional: true, - Computed: true, MaxItems: 1, Elem: &schema.Resource{ Schema: map[string]*schema.Schema{ @@ -738,6 +737,15 @@ func resourceEventSourceMappingUpdate(ctx context.Context, d *schema.ResourceDat input.ParallelizationFactor = aws.Int32(int32(d.Get("parallelization_factor").(int))) } + if d.HasChange("provisioned_poller_config") { + if v, ok := d.GetOk("provisioned_poller_config"); ok && len(v.([]interface{})) > 0 && v.([]interface{})[0] != nil { + input.ProvisionedPollerConfig = expandProvisionedPollerConfig(v.([]interface{})[0].(map[string]interface{})) + } else { + // AWS ignores the removal if this is left as nil. + input.ProvisionedPollerConfig = &awstypes.ProvisionedPollerConfig{} + } + } + if d.HasChange("scaling_config") { if v, ok := d.GetOk("scaling_config"); ok && len(v.([]interface{})) > 0 && v.([]interface{})[0] != nil { input.ScalingConfig = expandScalingConfig(v.([]interface{})[0].(map[string]interface{})) diff --git a/internal/service/lambda/event_source_mapping_test.go b/internal/service/lambda/event_source_mapping_test.go index 4cd9435c7a3..b975814cf14 100644 --- a/internal/service/lambda/event_source_mapping_test.go +++ b/internal/service/lambda/event_source_mapping_test.go @@ -58,6 +58,7 @@ func TestAccLambdaEventSourceMapping_Kinesis_basic(t *testing.T) { resource.TestCheckResourceAttr(resourceName, "function_response_types.#", "0"), resource.TestCheckResourceAttr(resourceName, names.AttrKMSKeyARN, ""), acctest.CheckResourceAttrRFC3339(resourceName, "last_modified"), + resource.TestCheckResourceAttr(resourceName, "provisioned_poller_config.#", "0"), resource.TestCheckResourceAttr(resourceName, "tumbling_window_in_seconds", "0"), ), }, @@ -877,12 +878,11 @@ func TestAccLambdaEventSourceMapping_msk(t *testing.T) { Config: testAccEventSourceMappingConfig_msk(rName, "100"), Check: resource.ComposeTestCheckFunc( testAccCheckEventSourceMappingExists(ctx, resourceName, &v), + resource.TestCheckResourceAttr(resourceName, "amazon_managed_kafka_event_source_config.#", "1"), resource.TestCheckResourceAttr(resourceName, "batch_size", "100"), resource.TestCheckResourceAttrPair(resourceName, "event_source_arn", eventSourceResourceName, names.AttrARN), acctest.CheckResourceAttrRFC3339(resourceName, "last_modified"), - resource.TestCheckResourceAttr(resourceName, "amazon_managed_kafka_event_source_config.#", "1"), - resource.TestCheckResourceAttr(resourceName, "provisioned_poller_config.#", "1"), - resource.TestCheckResourceAttr(resourceName, "provisioned_poller_config.0.maximum_pollers", "60"), + resource.TestCheckResourceAttr(resourceName, "provisioned_poller_config.#", "0"), resource.TestCheckResourceAttr(resourceName, "topics.#", "1"), resource.TestCheckTypeSetElemAttr(resourceName, "topics.*", "test"), ), @@ -904,12 +904,10 @@ func TestAccLambdaEventSourceMapping_msk(t *testing.T) { Config: testAccEventSourceMappingConfig_msk(rName, "9999"), Check: resource.ComposeTestCheckFunc( testAccCheckEventSourceMappingExists(ctx, resourceName, &v), + resource.TestCheckResourceAttr(resourceName, "amazon_managed_kafka_event_source_config.#", "1"), resource.TestCheckResourceAttr(resourceName, "batch_size", "9999"), resource.TestCheckResourceAttrPair(resourceName, "event_source_arn", eventSourceResourceName, names.AttrARN), acctest.CheckResourceAttrRFC3339(resourceName, "last_modified"), - resource.TestCheckResourceAttr(resourceName, "amazon_managed_kafka_event_source_config.#", "1"), - resource.TestCheckResourceAttr(resourceName, "provisioned_poller_config.#", "1"), - resource.TestCheckResourceAttr(resourceName, "provisioned_poller_config.0.maximum_pollers", "60"), resource.TestCheckResourceAttr(resourceName, "topics.#", "1"), resource.TestCheckTypeSetElemAttr(resourceName, "topics.*", "test"), ), @@ -939,14 +937,12 @@ func TestAccLambdaEventSourceMapping_mskWithEventSourceConfig(t *testing.T) { Config: testAccEventSourceMappingConfig_mskWithEventSourceConfig(rName, "100"), Check: resource.ComposeTestCheckFunc( testAccCheckEventSourceMappingExists(ctx, resourceName, &v), + resource.TestCheckResourceAttr(resourceName, "amazon_managed_kafka_event_source_config.#", "1"), + resource.TestCheckResourceAttr(resourceName, "amazon_managed_kafka_event_source_config.0.consumer_group_id", "amazon-managed-test-group-id"), resource.TestCheckResourceAttr(resourceName, "batch_size", "100"), resource.TestCheckResourceAttrPair(resourceName, "event_source_arn", eventSourceResourceName, names.AttrARN), acctest.CheckResourceAttrRFC3339(resourceName, "last_modified"), - resource.TestCheckResourceAttr(resourceName, "amazon_managed_kafka_event_source_config.#", "1"), - resource.TestCheckResourceAttr(resourceName, "amazon_managed_kafka_event_source_config.0.consumer_group_id", "amazon-managed-test-group-id"), - resource.TestCheckResourceAttr(resourceName, "provisioned_poller_config.#", "1"), - resource.TestCheckResourceAttr(resourceName, "provisioned_poller_config.0.maximum_pollers", "80"), - resource.TestCheckResourceAttr(resourceName, "provisioned_poller_config.0.minimum_pollers", "10"), + resource.TestCheckResourceAttr(resourceName, "provisioned_poller_config.#", "0"), resource.TestCheckResourceAttr(resourceName, "topics.#", "1"), resource.TestCheckTypeSetElemAttr(resourceName, "topics.*", "test"), ), @@ -979,11 +975,11 @@ func TestAccLambdaEventSourceMapping_selfManagedKafka(t *testing.T) { testAccCheckEventSourceMappingExists(ctx, resourceName, &v), resource.TestCheckResourceAttr(resourceName, "batch_size", "100"), resource.TestCheckResourceAttr(resourceName, names.AttrEnabled, acctest.CtFalse), + acctest.CheckResourceAttrRFC3339(resourceName, "last_modified"), resource.TestCheckResourceAttr(resourceName, "self_managed_event_source.#", "1"), resource.TestCheckResourceAttr(resourceName, "self_managed_event_source.0.endpoints.KAFKA_BOOTSTRAP_SERVERS", "test1:9092,test2:9092"), resource.TestCheckResourceAttr(resourceName, "self_managed_kafka_event_source_config.#", "1"), resource.TestCheckResourceAttr(resourceName, "source_access_configuration.#", "3"), - acctest.CheckResourceAttrRFC3339(resourceName, "last_modified"), resource.TestCheckResourceAttr(resourceName, "topics.#", "1"), resource.TestCheckTypeSetElemAttr(resourceName, "topics.*", "test"), ), @@ -1024,15 +1020,13 @@ func TestAccLambdaEventSourceMapping_selfManagedKafkaWithEventSourceConfig(t *te testAccCheckEventSourceMappingExists(ctx, resourceName, &v), resource.TestCheckResourceAttr(resourceName, "batch_size", "100"), resource.TestCheckResourceAttr(resourceName, names.AttrEnabled, acctest.CtFalse), + acctest.CheckResourceAttrRFC3339(resourceName, "last_modified"), + resource.TestCheckResourceAttr(resourceName, "provisioned_poller_config.#", "0"), resource.TestCheckResourceAttr(resourceName, "self_managed_event_source.#", "1"), resource.TestCheckResourceAttr(resourceName, "self_managed_event_source.0.endpoints.KAFKA_BOOTSTRAP_SERVERS", "test1:9092,test2:9092"), resource.TestCheckResourceAttr(resourceName, "self_managed_kafka_event_source_config.#", "1"), resource.TestCheckResourceAttr(resourceName, "self_managed_kafka_event_source_config.0.consumer_group_id", "self-managed-test-group-id"), resource.TestCheckResourceAttr(resourceName, "source_access_configuration.#", "3"), - resource.TestCheckResourceAttr(resourceName, "provisioned_poller_config.#", "1"), - resource.TestCheckResourceAttr(resourceName, "provisioned_poller_config.0.maximum_pollers", "80"), - resource.TestCheckResourceAttr(resourceName, "provisioned_poller_config.0.minimum_pollers", "10"), - acctest.CheckResourceAttrRFC3339(resourceName, "last_modified"), resource.TestCheckResourceAttr(resourceName, "topics.#", "1"), resource.TestCheckTypeSetElemAttr(resourceName, "topics.*", "test"), ), @@ -1047,6 +1041,53 @@ func TestAccLambdaEventSourceMapping_selfManagedKafkaWithEventSourceConfig(t *te }) } +func TestAccLambdaEventSourceMapping_selfManagedKafkaWithProvisionedPollerConfig(t *testing.T) { + ctx := acctest.Context(t) + var v lambda.GetEventSourceMappingOutput + resourceName := "aws_lambda_event_source_mapping.test" + rName := sdkacctest.RandomWithPrefix(acctest.ResourcePrefix) + + resource.ParallelTest(t, resource.TestCase{ + PreCheck: func() { acctest.PreCheck(ctx, t) }, + ErrorCheck: acctest.ErrorCheck(t, names.LambdaServiceID), + ProtoV5ProviderFactories: acctest.ProtoV5ProviderFactories, + CheckDestroy: testAccCheckEventSourceMappingDestroy(ctx), + Steps: []resource.TestStep{ + { + Config: testAccEventSourceMappingConfig_selfManagedKafkaWithProvisionedPollerConfig(rName, "100", "test1:9092,test2:9092", "100", "null"), + Check: resource.ComposeTestCheckFunc( + testAccCheckEventSourceMappingExists(ctx, resourceName, &v), + resource.TestCheckResourceAttr(resourceName, "provisioned_poller_config.#", "1"), + resource.TestCheckResourceAttr(resourceName, "provisioned_poller_config.0.maximum_pollers", "100"), + resource.TestCheckResourceAttrSet(resourceName, "provisioned_poller_config.0.minimum_pollers"), + ), + }, + { + ResourceName: resourceName, + ImportState: true, + ImportStateVerify: true, + ImportStateVerifyIgnore: []string{"last_modified"}, + }, + { + Config: testAccEventSourceMappingConfig_selfManagedKafkaWithProvisionedPollerConfig(rName, "100", "test1:9092,test2:9092", "150", "15"), + Check: resource.ComposeTestCheckFunc( + testAccCheckEventSourceMappingExists(ctx, resourceName, &v), + resource.TestCheckResourceAttr(resourceName, "provisioned_poller_config.#", "1"), + resource.TestCheckResourceAttr(resourceName, "provisioned_poller_config.0.maximum_pollers", "150"), + resource.TestCheckResourceAttr(resourceName, "provisioned_poller_config.0.minimum_pollers", "15"), + ), + }, + { + Config: testAccEventSourceMappingConfig_selfManagedKafka(rName, "100", "test1:9092,test2:9092"), + Check: resource.ComposeTestCheckFunc( + testAccCheckEventSourceMappingExists(ctx, resourceName, &v), + resource.TestCheckResourceAttr(resourceName, "provisioned_poller_config.#", "0"), + ), + }, + }, + }) +} + func TestAccLambdaEventSourceMapping_activeMQ(t *testing.T) { ctx := acctest.Context(t) if testing.Short() { @@ -2377,10 +2418,6 @@ resource "aws_lambda_event_source_mapping" "test" { topics = ["test"] starting_position = "TRIM_HORIZON" - provisioned_poller_config { - maximum_pollers = 60 - } - depends_on = [aws_iam_policy_attachment.test] } `, rName, batchSize)) @@ -2422,11 +2459,6 @@ resource "aws_lambda_event_source_mapping" "test" { consumer_group_id = "amazon-managed-test-group-id" } - provisioned_poller_config { - maximum_pollers = 80 - minimum_pollers = 10 - } - depends_on = [aws_iam_policy_attachment.test] } `, rName, batchSize)) @@ -2463,7 +2495,6 @@ resource "aws_lambda_event_source_mapping" "test" { type = "VPC_SECURITY_GROUP" uri = aws_security_group.test.id } - } `, rName, batchSize, kafkaBootstrapServers)) } @@ -2503,13 +2534,54 @@ resource "aws_lambda_event_source_mapping" "test" { type = "VPC_SECURITY_GROUP" uri = aws_security_group.test.id } +} +`, rName, batchSize, kafkaBootstrapServers)) +} + +func testAccEventSourceMappingConfig_selfManagedKafkaWithProvisionedPollerConfig(rName, batchSize, kafkaBootstrapServers, maxPollers, minPollers string) string { + if batchSize == "" { + batchSize = "null" + } + if maxPollers == "" { + maxPollers = "null" + } + if minPollers == "" { + minPollers = "null" + } + + return acctest.ConfigCompose(testAccEventSourceMappingConfig_kafkaBase(rName), fmt.Sprintf(` +resource "aws_lambda_event_source_mapping" "test" { + batch_size = %[2]s + enabled = false + function_name = aws_lambda_function.test.arn + topics = ["test"] + starting_position = "TRIM_HORIZON" + + self_managed_event_source { + endpoints = { + KAFKA_BOOTSTRAP_SERVERS = %[3]q + } + } + + dynamic "source_access_configuration" { + for_each = aws_subnet.test[*].id + content { + type = "VPC_SUBNET" + uri = "subnet:${source_access_configuration.value}" + } + } + + source_access_configuration { + type = "VPC_SECURITY_GROUP" + uri = aws_security_group.test.id + } provisioned_poller_config { - maximum_pollers = 80 - minimum_pollers = 10 + maximum_pollers = %[4]s + minimum_pollers = %[5]s } } -`, rName, batchSize, kafkaBootstrapServers)) +`, rName, batchSize, kafkaBootstrapServers, maxPollers, minPollers)) } func testAccEventSourceMappingConfig_dynamoDBBatchSize(rName, batchSize string) string { From 3e1864c1b2dbf16e54e5f4398ad20dcb0a66bd2b Mon Sep 17 00:00:00 2001 From: Kit Ewbank Date: Wed, 27 Nov 2024 14:56:30 -0500 Subject: [PATCH 07/10] Update website/docs/r/lambda_event_source_mapping.html.markdown Co-authored-by: Graham Davison --- website/docs/r/lambda_event_source_mapping.html.markdown | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/website/docs/r/lambda_event_source_mapping.html.markdown b/website/docs/r/lambda_event_source_mapping.html.markdown index 46419a1e991..8953c562dbf 100644 --- a/website/docs/r/lambda_event_source_mapping.html.markdown +++ b/website/docs/r/lambda_event_source_mapping.html.markdown @@ -172,7 +172,9 @@ resource "aws_lambda_event_source_mapping" "example" { * `maximum_retry_attempts`: - (Optional) The maximum number of times to retry when the function returns an error. Only available for stream sources (DynamoDB and Kinesis). Minimum and default of -1 (forever), maximum of 10000. * `metrics_config`: - (Optional) CloudWatch metrics configuration of the event source. Only available for stream sources (DynamoDB and Kinesis) and SQS queues. Detailed below. * `parallelization_factor`: - (Optional) The number of batches to process from each shard concurrently. Only available for stream sources (DynamoDB and Kinesis). Minimum and default of 1, maximum of 10. -* `provisioned_poller_config`: - (Optional) Amazon MSK and self-managed Apache Kafka only, the provisioned mode configuration for the event source. +* `provisioned_poller_config`: - (Optional) Event poller configuration for the event source. + Only valid for Amazon MSK or self-managed Apache Kafka sources. + Detailed below. * `queues` - (Optional) The name of the Amazon MQ broker destination queue to consume. Only available for MQ sources. The list must contain exactly one queue name. * `scaling_config` - (Optional) Scaling configuration of the event source. Only available for SQS queues. Detailed below. * `self_managed_event_source`: - (Optional) For Self Managed Kafka sources, the location of the self managed cluster. If set, configuration must also include `source_access_configuration`. Detailed below. From b1491cdcd67198559adaa4addea8b4f45596bdc9 Mon Sep 17 00:00:00 2001 From: Kit Ewbank Date: Wed, 27 Nov 2024 14:57:08 -0500 Subject: [PATCH 08/10] Update internal/service/lambda/event_source_mapping_test.go Co-authored-by: Graham Davison --- internal/service/lambda/event_source_mapping_test.go | 1 + 1 file changed, 1 insertion(+) diff --git a/internal/service/lambda/event_source_mapping_test.go b/internal/service/lambda/event_source_mapping_test.go index 3ecd5d65cf2..1d1408cdf98 100644 --- a/internal/service/lambda/event_source_mapping_test.go +++ b/internal/service/lambda/event_source_mapping_test.go @@ -977,6 +977,7 @@ func TestAccLambdaEventSourceMapping_selfManagedKafka(t *testing.T) { resource.TestCheckResourceAttr(resourceName, "batch_size", "100"), resource.TestCheckResourceAttr(resourceName, names.AttrEnabled, acctest.CtFalse), acctest.CheckResourceAttrRFC3339(resourceName, "last_modified"), + resource.TestCheckResourceAttr(resourceName, "provisioned_poller_config.#", "0"), resource.TestCheckResourceAttr(resourceName, "self_managed_event_source.#", "1"), resource.TestCheckResourceAttr(resourceName, "self_managed_event_source.0.endpoints.KAFKA_BOOTSTRAP_SERVERS", "test1:9092,test2:9092"), resource.TestCheckResourceAttr(resourceName, "self_managed_kafka_event_source_config.#", "1"), From 955fd22464e14928413bce5587e12aa5ba4efaf6 Mon Sep 17 00:00:00 2001 From: Kit Ewbank Date: Wed, 27 Nov 2024 15:04:40 -0500 Subject: [PATCH 09/10] Fix markdown-lint 'MD009/no-trailing-spaces Trailing spaces [Expected: 0 or 2; Actual: 1]'. --- .../docs/r/lambda_event_source_mapping.html.markdown | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/website/docs/r/lambda_event_source_mapping.html.markdown b/website/docs/r/lambda_event_source_mapping.html.markdown index 8953c562dbf..146ccbc69d3 100644 --- a/website/docs/r/lambda_event_source_mapping.html.markdown +++ b/website/docs/r/lambda_event_source_mapping.html.markdown @@ -172,9 +172,7 @@ resource "aws_lambda_event_source_mapping" "example" { * `maximum_retry_attempts`: - (Optional) The maximum number of times to retry when the function returns an error. Only available for stream sources (DynamoDB and Kinesis). Minimum and default of -1 (forever), maximum of 10000. * `metrics_config`: - (Optional) CloudWatch metrics configuration of the event source. Only available for stream sources (DynamoDB and Kinesis) and SQS queues. Detailed below. * `parallelization_factor`: - (Optional) The number of batches to process from each shard concurrently. Only available for stream sources (DynamoDB and Kinesis). Minimum and default of 1, maximum of 10. -* `provisioned_poller_config`: - (Optional) Event poller configuration for the event source. - Only valid for Amazon MSK or self-managed Apache Kafka sources. - Detailed below. +* `provisioned_poller_config`: - (Optional) Event poller configuration for the event source. Only valid for Amazon MSK or self-managed Apache Kafka sources. Detailed below. * `queues` - (Optional) The name of the Amazon MQ broker destination queue to consume. Only available for MQ sources. The list must contain exactly one queue name. * `scaling_config` - (Optional) Scaling configuration of the event source. Only available for SQS queues. Detailed below. * `self_managed_event_source`: - (Optional) For Self Managed Kafka sources, the location of the self managed cluster. If set, configuration must also include `source_access_configuration`. Detailed below. @@ -212,15 +210,15 @@ resource "aws_lambda_event_source_mapping" "example" { * `pattern` - (Optional) A filter pattern up to 4096 characters. See [Filter Rule Syntax](https://docs.aws.amazon.com/lambda/latest/dg/invocation-eventfiltering.html#filtering-syntax). +### metrics_config Configuration Block + +* `metrics` - (Required) A list containing the metrics to be produced by the event source mapping. Valid values: `EventCount`. + ### provisioned_poller_config Configuration Block * `maximum_pollers` - (Optional) The maximum number of event pollers this event source can scale up to. The range is between 1 and 2000. * `minimum_pollers` - (Optional) The minimum number of event pollers this event source can scale down to. The range is between 1 and 200. -### metrics_config Configuration Block - -* `metrics` - (Required) A list containing the metrics to be produced by the event source mapping. Valid values: `EventCount`. - ### scaling_config Configuration Block * `maximum_concurrency` - (Optional) Limits the number of concurrent instances that the Amazon SQS event source can invoke. Must be greater than or equal to `2`. See [Configuring maximum concurrency for Amazon SQS event sources](https://docs.aws.amazon.com/lambda/latest/dg/with-sqs.html#events-sqs-max-concurrency). You need to raise a [Service Quota Ticket](https://docs.aws.amazon.com/general/latest/gr/aws_service_limits.html) to increase the concurrency beyond 1000. From d6119da7db656c1e31f260df62b62170a4506374 Mon Sep 17 00:00:00 2001 From: Kit Ewbank Date: Wed, 27 Nov 2024 15:06:44 -0500 Subject: [PATCH 10/10] Tweak 'TestAccLambdaEventSourceMapping_selfManagedKafkaWithProvisionedPollerConfig' after review. --- internal/service/lambda/event_source_mapping_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/internal/service/lambda/event_source_mapping_test.go b/internal/service/lambda/event_source_mapping_test.go index 1d1408cdf98..28db7da75a7 100644 --- a/internal/service/lambda/event_source_mapping_test.go +++ b/internal/service/lambda/event_source_mapping_test.go @@ -1056,11 +1056,11 @@ func TestAccLambdaEventSourceMapping_selfManagedKafkaWithProvisionedPollerConfig CheckDestroy: testAccCheckEventSourceMappingDestroy(ctx), Steps: []resource.TestStep{ { - Config: testAccEventSourceMappingConfig_selfManagedKafkaWithProvisionedPollerConfig(rName, "100", "test1:9092,test2:9092", "100", "null"), + Config: testAccEventSourceMappingConfig_selfManagedKafkaWithProvisionedPollerConfig(rName, "100", "test1:9092,test2:9092", "123", "null"), Check: resource.ComposeTestCheckFunc( testAccCheckEventSourceMappingExists(ctx, resourceName, &v), resource.TestCheckResourceAttr(resourceName, "provisioned_poller_config.#", "1"), - resource.TestCheckResourceAttr(resourceName, "provisioned_poller_config.0.maximum_pollers", "100"), + resource.TestCheckResourceAttr(resourceName, "provisioned_poller_config.0.maximum_pollers", "123"), resource.TestCheckResourceAttrSet(resourceName, "provisioned_poller_config.0.minimum_pollers"), ), },