diff --git a/.changelog/40303.txt b/.changelog/40303.txt new file mode 100644 index 00000000000..3490cc62ef5 --- /dev/null +++ b/.changelog/40303.txt @@ -0,0 +1,3 @@ +```release-note:enhancement +resource/aws_lambda_event_source_mapping: Add `provisioned_poller_config` argument +``` \ No newline at end of file diff --git a/internal/service/lambda/event_source_mapping.go b/internal/service/lambda/event_source_mapping.go index 2d9bf650f1c..83590022317 100644 --- a/internal/service/lambda/event_source_mapping.go +++ b/internal/service/lambda/event_source_mapping.go @@ -262,6 +262,27 @@ func resourceEventSourceMapping() *schema.Resource { Computed: true, ValidateFunc: validation.IntBetween(1, 10), }, + "provisioned_poller_config": { + Type: schema.TypeList, + Optional: true, + MaxItems: 1, + Elem: &schema.Resource{ + Schema: map[string]*schema.Schema{ + "maximum_pollers": { + Type: schema.TypeInt, + Optional: true, + Computed: true, + ValidateFunc: validation.IntBetween(1, 2000), + }, + "minimum_pollers": { + Type: schema.TypeInt, + Optional: true, + Computed: true, + ValidateFunc: validation.IntBetween(1, 200), + }, + }, + }, + }, "queues": { Type: schema.TypeList, Optional: true, @@ -473,6 +494,10 @@ func resourceEventSourceMappingCreate(ctx context.Context, d *schema.ResourceDat input.ParallelizationFactor = aws.Int32(int32(v.(int))) } + if v, ok := d.GetOk("provisioned_poller_config"); ok && len(v.([]interface{})) > 0 && v.([]interface{})[0] != nil { + input.ProvisionedPollerConfig = expandProvisionedPollerConfig(v.([]interface{})[0].(map[string]interface{})) + } + if v, ok := d.GetOk("queues"); ok && len(v.([]interface{})) > 0 { input.Queues = flex.ExpandStringValueList(v.([]interface{})) } @@ -614,6 +639,13 @@ func resourceEventSourceMappingRead(ctx context.Context, d *schema.ResourceData, d.Set("metrics_config", nil) } d.Set("parallelization_factor", output.ParallelizationFactor) + if v := output.ProvisionedPollerConfig; v != nil { + if err := d.Set("provisioned_poller_config", []interface{}{flattenProvisionedPollerConfig(v)}); err != nil { + return sdkdiag.AppendErrorf(diags, "setting provisioned_poller_config: %s", err) + } + } else { + d.Set("provisioned_poller_config", nil) + } d.Set("queues", output.Queues) if v := output.ScalingConfig; v != nil { if err := d.Set("scaling_config", []interface{}{flattenScalingConfig(v)}); err != nil { @@ -742,6 +774,15 @@ func resourceEventSourceMappingUpdate(ctx context.Context, d *schema.ResourceDat input.ParallelizationFactor = aws.Int32(int32(d.Get("parallelization_factor").(int))) } + if d.HasChange("provisioned_poller_config") { + if v, ok := d.GetOk("provisioned_poller_config"); ok && len(v.([]interface{})) > 0 && v.([]interface{})[0] != nil { + input.ProvisionedPollerConfig = expandProvisionedPollerConfig(v.([]interface{})[0].(map[string]interface{})) + } else { + // AWS ignores the removal if this is left as nil. + input.ProvisionedPollerConfig = &awstypes.ProvisionedPollerConfig{} + } + } + if d.HasChange("scaling_config") { if v, ok := d.GetOk("scaling_config"); ok && len(v.([]interface{})) > 0 && v.([]interface{})[0] != nil { input.ScalingConfig = expandScalingConfig(v.([]interface{})[0].(map[string]interface{})) @@ -1144,6 +1185,42 @@ func flattenSelfManagedKafkaEventSourceConfig(apiObject *awstypes.SelfManagedKaf return tfMap } +func expandProvisionedPollerConfig(tfMap map[string]interface{}) *awstypes.ProvisionedPollerConfig { + if tfMap == nil { + return nil + } + + apiObject := &awstypes.ProvisionedPollerConfig{} + + if v, ok := tfMap["maximum_pollers"].(int); ok && v != 0 { + apiObject.MaximumPollers = aws.Int32(int32(v)) + } + + if v, ok := tfMap["minimum_pollers"].(int); ok && v != 0 { + apiObject.MinimumPollers = aws.Int32(int32(v)) + } + + return apiObject +} + +func flattenProvisionedPollerConfig(apiObject *awstypes.ProvisionedPollerConfig) map[string]interface{} { + if apiObject == nil { + return nil + } + + tfMap := map[string]interface{}{} + + if v := apiObject.MaximumPollers; v != nil { + tfMap["maximum_pollers"] = aws.ToInt32(v) + } + + if v := apiObject.MinimumPollers; v != nil { + tfMap["minimum_pollers"] = aws.ToInt32(v) + } + + return tfMap +} + func expandSourceAccessConfiguration(tfMap map[string]interface{}) *awstypes.SourceAccessConfiguration { if tfMap == nil { return nil diff --git a/internal/service/lambda/event_source_mapping_test.go b/internal/service/lambda/event_source_mapping_test.go index dc80f06658c..28db7da75a7 100644 --- a/internal/service/lambda/event_source_mapping_test.go +++ b/internal/service/lambda/event_source_mapping_test.go @@ -59,6 +59,7 @@ func TestAccLambdaEventSourceMapping_Kinesis_basic(t *testing.T) { resource.TestCheckResourceAttr(resourceName, names.AttrKMSKeyARN, ""), acctest.CheckResourceAttrRFC3339(resourceName, "last_modified"), resource.TestCheckResourceAttr(resourceName, "metrics_config.#", "0"), + resource.TestCheckResourceAttr(resourceName, "provisioned_poller_config.#", "0"), resource.TestCheckResourceAttr(resourceName, "tumbling_window_in_seconds", "0"), ), }, @@ -878,10 +879,11 @@ func TestAccLambdaEventSourceMapping_msk(t *testing.T) { Config: testAccEventSourceMappingConfig_msk(rName, "100"), Check: resource.ComposeTestCheckFunc( testAccCheckEventSourceMappingExists(ctx, resourceName, &v), + resource.TestCheckResourceAttr(resourceName, "amazon_managed_kafka_event_source_config.#", "1"), resource.TestCheckResourceAttr(resourceName, "batch_size", "100"), resource.TestCheckResourceAttrPair(resourceName, "event_source_arn", eventSourceResourceName, names.AttrARN), acctest.CheckResourceAttrRFC3339(resourceName, "last_modified"), - resource.TestCheckResourceAttr(resourceName, "amazon_managed_kafka_event_source_config.#", "1"), + resource.TestCheckResourceAttr(resourceName, "provisioned_poller_config.#", "0"), resource.TestCheckResourceAttr(resourceName, "topics.#", "1"), resource.TestCheckTypeSetElemAttr(resourceName, "topics.*", "test"), ), @@ -903,10 +905,10 @@ func TestAccLambdaEventSourceMapping_msk(t *testing.T) { Config: testAccEventSourceMappingConfig_msk(rName, "9999"), Check: resource.ComposeTestCheckFunc( testAccCheckEventSourceMappingExists(ctx, resourceName, &v), + resource.TestCheckResourceAttr(resourceName, "amazon_managed_kafka_event_source_config.#", "1"), resource.TestCheckResourceAttr(resourceName, "batch_size", "9999"), resource.TestCheckResourceAttrPair(resourceName, "event_source_arn", eventSourceResourceName, names.AttrARN), acctest.CheckResourceAttrRFC3339(resourceName, "last_modified"), - resource.TestCheckResourceAttr(resourceName, "amazon_managed_kafka_event_source_config.#", "1"), resource.TestCheckResourceAttr(resourceName, "topics.#", "1"), resource.TestCheckTypeSetElemAttr(resourceName, "topics.*", "test"), ), @@ -936,11 +938,12 @@ func TestAccLambdaEventSourceMapping_mskWithEventSourceConfig(t *testing.T) { Config: testAccEventSourceMappingConfig_mskWithEventSourceConfig(rName, "100"), Check: resource.ComposeTestCheckFunc( testAccCheckEventSourceMappingExists(ctx, resourceName, &v), + resource.TestCheckResourceAttr(resourceName, "amazon_managed_kafka_event_source_config.#", "1"), + resource.TestCheckResourceAttr(resourceName, "amazon_managed_kafka_event_source_config.0.consumer_group_id", "amazon-managed-test-group-id"), resource.TestCheckResourceAttr(resourceName, "batch_size", "100"), resource.TestCheckResourceAttrPair(resourceName, "event_source_arn", eventSourceResourceName, names.AttrARN), acctest.CheckResourceAttrRFC3339(resourceName, "last_modified"), - resource.TestCheckResourceAttr(resourceName, "amazon_managed_kafka_event_source_config.#", "1"), - resource.TestCheckResourceAttr(resourceName, "amazon_managed_kafka_event_source_config.0.consumer_group_id", "amazon-managed-test-group-id"), + resource.TestCheckResourceAttr(resourceName, "provisioned_poller_config.#", "0"), resource.TestCheckResourceAttr(resourceName, "topics.#", "1"), resource.TestCheckTypeSetElemAttr(resourceName, "topics.*", "test"), ), @@ -973,11 +976,12 @@ func TestAccLambdaEventSourceMapping_selfManagedKafka(t *testing.T) { testAccCheckEventSourceMappingExists(ctx, resourceName, &v), resource.TestCheckResourceAttr(resourceName, "batch_size", "100"), resource.TestCheckResourceAttr(resourceName, names.AttrEnabled, acctest.CtFalse), + acctest.CheckResourceAttrRFC3339(resourceName, "last_modified"), + resource.TestCheckResourceAttr(resourceName, "provisioned_poller_config.#", "0"), resource.TestCheckResourceAttr(resourceName, "self_managed_event_source.#", "1"), resource.TestCheckResourceAttr(resourceName, "self_managed_event_source.0.endpoints.KAFKA_BOOTSTRAP_SERVERS", "test1:9092,test2:9092"), resource.TestCheckResourceAttr(resourceName, "self_managed_kafka_event_source_config.#", "1"), resource.TestCheckResourceAttr(resourceName, "source_access_configuration.#", "3"), - acctest.CheckResourceAttrRFC3339(resourceName, "last_modified"), resource.TestCheckResourceAttr(resourceName, "topics.#", "1"), resource.TestCheckTypeSetElemAttr(resourceName, "topics.*", "test"), ), @@ -1018,12 +1022,13 @@ func TestAccLambdaEventSourceMapping_selfManagedKafkaWithEventSourceConfig(t *te testAccCheckEventSourceMappingExists(ctx, resourceName, &v), resource.TestCheckResourceAttr(resourceName, "batch_size", "100"), resource.TestCheckResourceAttr(resourceName, names.AttrEnabled, acctest.CtFalse), + acctest.CheckResourceAttrRFC3339(resourceName, "last_modified"), + resource.TestCheckResourceAttr(resourceName, "provisioned_poller_config.#", "0"), resource.TestCheckResourceAttr(resourceName, "self_managed_event_source.#", "1"), resource.TestCheckResourceAttr(resourceName, "self_managed_event_source.0.endpoints.KAFKA_BOOTSTRAP_SERVERS", "test1:9092,test2:9092"), resource.TestCheckResourceAttr(resourceName, "self_managed_kafka_event_source_config.#", "1"), resource.TestCheckResourceAttr(resourceName, "self_managed_kafka_event_source_config.0.consumer_group_id", "self-managed-test-group-id"), resource.TestCheckResourceAttr(resourceName, "source_access_configuration.#", "3"), - acctest.CheckResourceAttrRFC3339(resourceName, "last_modified"), resource.TestCheckResourceAttr(resourceName, "topics.#", "1"), resource.TestCheckTypeSetElemAttr(resourceName, "topics.*", "test"), ), @@ -1038,6 +1043,53 @@ func TestAccLambdaEventSourceMapping_selfManagedKafkaWithEventSourceConfig(t *te }) } +func TestAccLambdaEventSourceMapping_selfManagedKafkaWithProvisionedPollerConfig(t *testing.T) { + ctx := acctest.Context(t) + var v lambda.GetEventSourceMappingOutput + resourceName := "aws_lambda_event_source_mapping.test" + rName := sdkacctest.RandomWithPrefix(acctest.ResourcePrefix) + + resource.ParallelTest(t, resource.TestCase{ + PreCheck: func() { acctest.PreCheck(ctx, t) }, + ErrorCheck: acctest.ErrorCheck(t, names.LambdaServiceID), + ProtoV5ProviderFactories: acctest.ProtoV5ProviderFactories, + CheckDestroy: testAccCheckEventSourceMappingDestroy(ctx), + Steps: []resource.TestStep{ + { + Config: testAccEventSourceMappingConfig_selfManagedKafkaWithProvisionedPollerConfig(rName, "100", "test1:9092,test2:9092", "123", "null"), + Check: resource.ComposeTestCheckFunc( + testAccCheckEventSourceMappingExists(ctx, resourceName, &v), + resource.TestCheckResourceAttr(resourceName, "provisioned_poller_config.#", "1"), + resource.TestCheckResourceAttr(resourceName, "provisioned_poller_config.0.maximum_pollers", "123"), + resource.TestCheckResourceAttrSet(resourceName, "provisioned_poller_config.0.minimum_pollers"), + ), + }, + { + ResourceName: resourceName, + ImportState: true, + ImportStateVerify: true, + ImportStateVerifyIgnore: []string{"last_modified"}, + }, + { + Config: testAccEventSourceMappingConfig_selfManagedKafkaWithProvisionedPollerConfig(rName, "100", "test1:9092,test2:9092", "150", "15"), + Check: resource.ComposeTestCheckFunc( + testAccCheckEventSourceMappingExists(ctx, resourceName, &v), + resource.TestCheckResourceAttr(resourceName, "provisioned_poller_config.#", "1"), + resource.TestCheckResourceAttr(resourceName, "provisioned_poller_config.0.maximum_pollers", "150"), + resource.TestCheckResourceAttr(resourceName, "provisioned_poller_config.0.minimum_pollers", "15"), + ), + }, + { + Config: testAccEventSourceMappingConfig_selfManagedKafka(rName, "100", "test1:9092,test2:9092"), + Check: resource.ComposeTestCheckFunc( + testAccCheckEventSourceMappingExists(ctx, resourceName, &v), + resource.TestCheckResourceAttr(resourceName, "provisioned_poller_config.#", "0"), + ), + }, + }, + }) +} + func TestAccLambdaEventSourceMapping_activeMQ(t *testing.T) { ctx := acctest.Context(t) if testing.Short() { @@ -2530,6 +2582,52 @@ resource "aws_lambda_event_source_mapping" "test" { `, rName, batchSize, kafkaBootstrapServers)) } +func testAccEventSourceMappingConfig_selfManagedKafkaWithProvisionedPollerConfig(rName, batchSize, kafkaBootstrapServers, maxPollers, minPollers string) string { + if batchSize == "" { + batchSize = "null" + } + if maxPollers == "" { + maxPollers = "null" + } + if minPollers == "" { + minPollers = "null" + } + + return acctest.ConfigCompose(testAccEventSourceMappingConfig_kafkaBase(rName), fmt.Sprintf(` +resource "aws_lambda_event_source_mapping" "test" { + batch_size = %[2]s + enabled = false + function_name = aws_lambda_function.test.arn + topics = ["test"] + starting_position = "TRIM_HORIZON" + + self_managed_event_source { + endpoints = { + KAFKA_BOOTSTRAP_SERVERS = %[3]q + } + } + + dynamic "source_access_configuration" { + for_each = aws_subnet.test[*].id + content { + type = "VPC_SUBNET" + uri = "subnet:${source_access_configuration.value}" + } + } + + source_access_configuration { + type = "VPC_SECURITY_GROUP" + uri = aws_security_group.test.id + } + + provisioned_poller_config { + maximum_pollers = %[4]s + minimum_pollers = %[5]s + } +} +`, rName, batchSize, kafkaBootstrapServers, maxPollers, minPollers)) +} + func testAccEventSourceMappingConfig_dynamoDBBatchSize(rName, batchSize string) string { if batchSize == "" { batchSize = "null" diff --git a/website/docs/r/lambda_event_source_mapping.html.markdown b/website/docs/r/lambda_event_source_mapping.html.markdown index 721513324d3..146ccbc69d3 100644 --- a/website/docs/r/lambda_event_source_mapping.html.markdown +++ b/website/docs/r/lambda_event_source_mapping.html.markdown @@ -58,6 +58,11 @@ resource "aws_lambda_event_source_mapping" "example" { topics = ["Example"] starting_position = "TRIM_HORIZON" + provisioned_poller_config { + maximum_poller = 80 + minimum_poller = 10 + } + self_managed_event_source { endpoints = { KAFKA_BOOTSTRAP_SERVERS = "kafka1.example.com:9092,kafka2.example.com:9092" @@ -167,6 +172,7 @@ resource "aws_lambda_event_source_mapping" "example" { * `maximum_retry_attempts`: - (Optional) The maximum number of times to retry when the function returns an error. Only available for stream sources (DynamoDB and Kinesis). Minimum and default of -1 (forever), maximum of 10000. * `metrics_config`: - (Optional) CloudWatch metrics configuration of the event source. Only available for stream sources (DynamoDB and Kinesis) and SQS queues. Detailed below. * `parallelization_factor`: - (Optional) The number of batches to process from each shard concurrently. Only available for stream sources (DynamoDB and Kinesis). Minimum and default of 1, maximum of 10. +* `provisioned_poller_config`: - (Optional) Event poller configuration for the event source. Only valid for Amazon MSK or self-managed Apache Kafka sources. Detailed below. * `queues` - (Optional) The name of the Amazon MQ broker destination queue to consume. Only available for MQ sources. The list must contain exactly one queue name. * `scaling_config` - (Optional) Scaling configuration of the event source. Only available for SQS queues. Detailed below. * `self_managed_event_source`: - (Optional) For Self Managed Kafka sources, the location of the self managed cluster. If set, configuration must also include `source_access_configuration`. Detailed below. @@ -208,6 +214,11 @@ resource "aws_lambda_event_source_mapping" "example" { * `metrics` - (Required) A list containing the metrics to be produced by the event source mapping. Valid values: `EventCount`. +### provisioned_poller_config Configuration Block + +* `maximum_pollers` - (Optional) The maximum number of event pollers this event source can scale up to. The range is between 1 and 2000. +* `minimum_pollers` - (Optional) The minimum number of event pollers this event source can scale down to. The range is between 1 and 200. + ### scaling_config Configuration Block * `maximum_concurrency` - (Optional) Limits the number of concurrent instances that the Amazon SQS event source can invoke. Must be greater than or equal to `2`. See [Configuring maximum concurrency for Amazon SQS event sources](https://docs.aws.amazon.com/lambda/latest/dg/with-sqs.html#events-sqs-max-concurrency). You need to raise a [Service Quota Ticket](https://docs.aws.amazon.com/general/latest/gr/aws_service_limits.html) to increase the concurrency beyond 1000.