Skip to content

Commit

Permalink
Merge pull request hashicorp#40303 from lvthillo/f-aws_lambda_event_s…
Browse files Browse the repository at this point in the history
…ource_mapping-add-provisioned-poller-config

feat: add provisioned_poller_config for kafka in lambda event source …
  • Loading branch information
ewbankkit authored Nov 27, 2024
2 parents 398878c + d6119da commit dd0feab
Show file tree
Hide file tree
Showing 4 changed files with 195 additions and 6 deletions.
3 changes: 3 additions & 0 deletions .changelog/40303.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:enhancement
resource/aws_lambda_event_source_mapping: Add `provisioned_poller_config` argument
```
77 changes: 77 additions & 0 deletions internal/service/lambda/event_source_mapping.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,27 @@ func resourceEventSourceMapping() *schema.Resource {
Computed: true,
ValidateFunc: validation.IntBetween(1, 10),
},
"provisioned_poller_config": {
Type: schema.TypeList,
Optional: true,
MaxItems: 1,
Elem: &schema.Resource{
Schema: map[string]*schema.Schema{
"maximum_pollers": {
Type: schema.TypeInt,
Optional: true,
Computed: true,
ValidateFunc: validation.IntBetween(1, 2000),
},
"minimum_pollers": {
Type: schema.TypeInt,
Optional: true,
Computed: true,
ValidateFunc: validation.IntBetween(1, 200),
},
},
},
},
"queues": {
Type: schema.TypeList,
Optional: true,
Expand Down Expand Up @@ -473,6 +494,10 @@ func resourceEventSourceMappingCreate(ctx context.Context, d *schema.ResourceDat
input.ParallelizationFactor = aws.Int32(int32(v.(int)))
}

if v, ok := d.GetOk("provisioned_poller_config"); ok && len(v.([]interface{})) > 0 && v.([]interface{})[0] != nil {
input.ProvisionedPollerConfig = expandProvisionedPollerConfig(v.([]interface{})[0].(map[string]interface{}))
}

if v, ok := d.GetOk("queues"); ok && len(v.([]interface{})) > 0 {
input.Queues = flex.ExpandStringValueList(v.([]interface{}))
}
Expand Down Expand Up @@ -614,6 +639,13 @@ func resourceEventSourceMappingRead(ctx context.Context, d *schema.ResourceData,
d.Set("metrics_config", nil)
}
d.Set("parallelization_factor", output.ParallelizationFactor)
if v := output.ProvisionedPollerConfig; v != nil {
if err := d.Set("provisioned_poller_config", []interface{}{flattenProvisionedPollerConfig(v)}); err != nil {
return sdkdiag.AppendErrorf(diags, "setting provisioned_poller_config: %s", err)
}
} else {
d.Set("provisioned_poller_config", nil)
}
d.Set("queues", output.Queues)
if v := output.ScalingConfig; v != nil {
if err := d.Set("scaling_config", []interface{}{flattenScalingConfig(v)}); err != nil {
Expand Down Expand Up @@ -742,6 +774,15 @@ func resourceEventSourceMappingUpdate(ctx context.Context, d *schema.ResourceDat
input.ParallelizationFactor = aws.Int32(int32(d.Get("parallelization_factor").(int)))
}

if d.HasChange("provisioned_poller_config") {
if v, ok := d.GetOk("provisioned_poller_config"); ok && len(v.([]interface{})) > 0 && v.([]interface{})[0] != nil {
input.ProvisionedPollerConfig = expandProvisionedPollerConfig(v.([]interface{})[0].(map[string]interface{}))
} else {
// AWS ignores the removal if this is left as nil.
input.ProvisionedPollerConfig = &awstypes.ProvisionedPollerConfig{}
}
}

if d.HasChange("scaling_config") {
if v, ok := d.GetOk("scaling_config"); ok && len(v.([]interface{})) > 0 && v.([]interface{})[0] != nil {
input.ScalingConfig = expandScalingConfig(v.([]interface{})[0].(map[string]interface{}))
Expand Down Expand Up @@ -1144,6 +1185,42 @@ func flattenSelfManagedKafkaEventSourceConfig(apiObject *awstypes.SelfManagedKaf
return tfMap
}

func expandProvisionedPollerConfig(tfMap map[string]interface{}) *awstypes.ProvisionedPollerConfig {
if tfMap == nil {
return nil
}

apiObject := &awstypes.ProvisionedPollerConfig{}

if v, ok := tfMap["maximum_pollers"].(int); ok && v != 0 {
apiObject.MaximumPollers = aws.Int32(int32(v))
}

if v, ok := tfMap["minimum_pollers"].(int); ok && v != 0 {
apiObject.MinimumPollers = aws.Int32(int32(v))
}

return apiObject
}

func flattenProvisionedPollerConfig(apiObject *awstypes.ProvisionedPollerConfig) map[string]interface{} {
if apiObject == nil {
return nil
}

tfMap := map[string]interface{}{}

if v := apiObject.MaximumPollers; v != nil {
tfMap["maximum_pollers"] = aws.ToInt32(v)
}

if v := apiObject.MinimumPollers; v != nil {
tfMap["minimum_pollers"] = aws.ToInt32(v)
}

return tfMap
}

func expandSourceAccessConfiguration(tfMap map[string]interface{}) *awstypes.SourceAccessConfiguration {
if tfMap == nil {
return nil
Expand Down
110 changes: 104 additions & 6 deletions internal/service/lambda/event_source_mapping_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ func TestAccLambdaEventSourceMapping_Kinesis_basic(t *testing.T) {
resource.TestCheckResourceAttr(resourceName, names.AttrKMSKeyARN, ""),
acctest.CheckResourceAttrRFC3339(resourceName, "last_modified"),
resource.TestCheckResourceAttr(resourceName, "metrics_config.#", "0"),
resource.TestCheckResourceAttr(resourceName, "provisioned_poller_config.#", "0"),
resource.TestCheckResourceAttr(resourceName, "tumbling_window_in_seconds", "0"),
),
},
Expand Down Expand Up @@ -878,10 +879,11 @@ func TestAccLambdaEventSourceMapping_msk(t *testing.T) {
Config: testAccEventSourceMappingConfig_msk(rName, "100"),
Check: resource.ComposeTestCheckFunc(
testAccCheckEventSourceMappingExists(ctx, resourceName, &v),
resource.TestCheckResourceAttr(resourceName, "amazon_managed_kafka_event_source_config.#", "1"),
resource.TestCheckResourceAttr(resourceName, "batch_size", "100"),
resource.TestCheckResourceAttrPair(resourceName, "event_source_arn", eventSourceResourceName, names.AttrARN),
acctest.CheckResourceAttrRFC3339(resourceName, "last_modified"),
resource.TestCheckResourceAttr(resourceName, "amazon_managed_kafka_event_source_config.#", "1"),
resource.TestCheckResourceAttr(resourceName, "provisioned_poller_config.#", "0"),
resource.TestCheckResourceAttr(resourceName, "topics.#", "1"),
resource.TestCheckTypeSetElemAttr(resourceName, "topics.*", "test"),
),
Expand All @@ -903,10 +905,10 @@ func TestAccLambdaEventSourceMapping_msk(t *testing.T) {
Config: testAccEventSourceMappingConfig_msk(rName, "9999"),
Check: resource.ComposeTestCheckFunc(
testAccCheckEventSourceMappingExists(ctx, resourceName, &v),
resource.TestCheckResourceAttr(resourceName, "amazon_managed_kafka_event_source_config.#", "1"),
resource.TestCheckResourceAttr(resourceName, "batch_size", "9999"),
resource.TestCheckResourceAttrPair(resourceName, "event_source_arn", eventSourceResourceName, names.AttrARN),
acctest.CheckResourceAttrRFC3339(resourceName, "last_modified"),
resource.TestCheckResourceAttr(resourceName, "amazon_managed_kafka_event_source_config.#", "1"),
resource.TestCheckResourceAttr(resourceName, "topics.#", "1"),
resource.TestCheckTypeSetElemAttr(resourceName, "topics.*", "test"),
),
Expand Down Expand Up @@ -936,11 +938,12 @@ func TestAccLambdaEventSourceMapping_mskWithEventSourceConfig(t *testing.T) {
Config: testAccEventSourceMappingConfig_mskWithEventSourceConfig(rName, "100"),
Check: resource.ComposeTestCheckFunc(
testAccCheckEventSourceMappingExists(ctx, resourceName, &v),
resource.TestCheckResourceAttr(resourceName, "amazon_managed_kafka_event_source_config.#", "1"),
resource.TestCheckResourceAttr(resourceName, "amazon_managed_kafka_event_source_config.0.consumer_group_id", "amazon-managed-test-group-id"),
resource.TestCheckResourceAttr(resourceName, "batch_size", "100"),
resource.TestCheckResourceAttrPair(resourceName, "event_source_arn", eventSourceResourceName, names.AttrARN),
acctest.CheckResourceAttrRFC3339(resourceName, "last_modified"),
resource.TestCheckResourceAttr(resourceName, "amazon_managed_kafka_event_source_config.#", "1"),
resource.TestCheckResourceAttr(resourceName, "amazon_managed_kafka_event_source_config.0.consumer_group_id", "amazon-managed-test-group-id"),
resource.TestCheckResourceAttr(resourceName, "provisioned_poller_config.#", "0"),
resource.TestCheckResourceAttr(resourceName, "topics.#", "1"),
resource.TestCheckTypeSetElemAttr(resourceName, "topics.*", "test"),
),
Expand Down Expand Up @@ -973,11 +976,12 @@ func TestAccLambdaEventSourceMapping_selfManagedKafka(t *testing.T) {
testAccCheckEventSourceMappingExists(ctx, resourceName, &v),
resource.TestCheckResourceAttr(resourceName, "batch_size", "100"),
resource.TestCheckResourceAttr(resourceName, names.AttrEnabled, acctest.CtFalse),
acctest.CheckResourceAttrRFC3339(resourceName, "last_modified"),
resource.TestCheckResourceAttr(resourceName, "provisioned_poller_config.#", "0"),
resource.TestCheckResourceAttr(resourceName, "self_managed_event_source.#", "1"),
resource.TestCheckResourceAttr(resourceName, "self_managed_event_source.0.endpoints.KAFKA_BOOTSTRAP_SERVERS", "test1:9092,test2:9092"),
resource.TestCheckResourceAttr(resourceName, "self_managed_kafka_event_source_config.#", "1"),
resource.TestCheckResourceAttr(resourceName, "source_access_configuration.#", "3"),
acctest.CheckResourceAttrRFC3339(resourceName, "last_modified"),
resource.TestCheckResourceAttr(resourceName, "topics.#", "1"),
resource.TestCheckTypeSetElemAttr(resourceName, "topics.*", "test"),
),
Expand Down Expand Up @@ -1018,12 +1022,13 @@ func TestAccLambdaEventSourceMapping_selfManagedKafkaWithEventSourceConfig(t *te
testAccCheckEventSourceMappingExists(ctx, resourceName, &v),
resource.TestCheckResourceAttr(resourceName, "batch_size", "100"),
resource.TestCheckResourceAttr(resourceName, names.AttrEnabled, acctest.CtFalse),
acctest.CheckResourceAttrRFC3339(resourceName, "last_modified"),
resource.TestCheckResourceAttr(resourceName, "provisioned_poller_config.#", "0"),
resource.TestCheckResourceAttr(resourceName, "self_managed_event_source.#", "1"),
resource.TestCheckResourceAttr(resourceName, "self_managed_event_source.0.endpoints.KAFKA_BOOTSTRAP_SERVERS", "test1:9092,test2:9092"),
resource.TestCheckResourceAttr(resourceName, "self_managed_kafka_event_source_config.#", "1"),
resource.TestCheckResourceAttr(resourceName, "self_managed_kafka_event_source_config.0.consumer_group_id", "self-managed-test-group-id"),
resource.TestCheckResourceAttr(resourceName, "source_access_configuration.#", "3"),
acctest.CheckResourceAttrRFC3339(resourceName, "last_modified"),
resource.TestCheckResourceAttr(resourceName, "topics.#", "1"),
resource.TestCheckTypeSetElemAttr(resourceName, "topics.*", "test"),
),
Expand All @@ -1038,6 +1043,53 @@ func TestAccLambdaEventSourceMapping_selfManagedKafkaWithEventSourceConfig(t *te
})
}

func TestAccLambdaEventSourceMapping_selfManagedKafkaWithProvisionedPollerConfig(t *testing.T) {
ctx := acctest.Context(t)
var v lambda.GetEventSourceMappingOutput
resourceName := "aws_lambda_event_source_mapping.test"
rName := sdkacctest.RandomWithPrefix(acctest.ResourcePrefix)

resource.ParallelTest(t, resource.TestCase{
PreCheck: func() { acctest.PreCheck(ctx, t) },
ErrorCheck: acctest.ErrorCheck(t, names.LambdaServiceID),
ProtoV5ProviderFactories: acctest.ProtoV5ProviderFactories,
CheckDestroy: testAccCheckEventSourceMappingDestroy(ctx),
Steps: []resource.TestStep{
{
Config: testAccEventSourceMappingConfig_selfManagedKafkaWithProvisionedPollerConfig(rName, "100", "test1:9092,test2:9092", "123", "null"),
Check: resource.ComposeTestCheckFunc(
testAccCheckEventSourceMappingExists(ctx, resourceName, &v),
resource.TestCheckResourceAttr(resourceName, "provisioned_poller_config.#", "1"),
resource.TestCheckResourceAttr(resourceName, "provisioned_poller_config.0.maximum_pollers", "123"),
resource.TestCheckResourceAttrSet(resourceName, "provisioned_poller_config.0.minimum_pollers"),
),
},
{
ResourceName: resourceName,
ImportState: true,
ImportStateVerify: true,
ImportStateVerifyIgnore: []string{"last_modified"},
},
{
Config: testAccEventSourceMappingConfig_selfManagedKafkaWithProvisionedPollerConfig(rName, "100", "test1:9092,test2:9092", "150", "15"),
Check: resource.ComposeTestCheckFunc(
testAccCheckEventSourceMappingExists(ctx, resourceName, &v),
resource.TestCheckResourceAttr(resourceName, "provisioned_poller_config.#", "1"),
resource.TestCheckResourceAttr(resourceName, "provisioned_poller_config.0.maximum_pollers", "150"),
resource.TestCheckResourceAttr(resourceName, "provisioned_poller_config.0.minimum_pollers", "15"),
),
},
{
Config: testAccEventSourceMappingConfig_selfManagedKafka(rName, "100", "test1:9092,test2:9092"),
Check: resource.ComposeTestCheckFunc(
testAccCheckEventSourceMappingExists(ctx, resourceName, &v),
resource.TestCheckResourceAttr(resourceName, "provisioned_poller_config.#", "0"),
),
},
},
})
}

func TestAccLambdaEventSourceMapping_activeMQ(t *testing.T) {
ctx := acctest.Context(t)
if testing.Short() {
Expand Down Expand Up @@ -2530,6 +2582,52 @@ resource "aws_lambda_event_source_mapping" "test" {
`, rName, batchSize, kafkaBootstrapServers))
}

func testAccEventSourceMappingConfig_selfManagedKafkaWithProvisionedPollerConfig(rName, batchSize, kafkaBootstrapServers, maxPollers, minPollers string) string {
if batchSize == "" {
batchSize = "null"
}
if maxPollers == "" {
maxPollers = "null"
}
if minPollers == "" {
minPollers = "null"
}

return acctest.ConfigCompose(testAccEventSourceMappingConfig_kafkaBase(rName), fmt.Sprintf(`
resource "aws_lambda_event_source_mapping" "test" {
batch_size = %[2]s
enabled = false
function_name = aws_lambda_function.test.arn
topics = ["test"]
starting_position = "TRIM_HORIZON"
self_managed_event_source {
endpoints = {
KAFKA_BOOTSTRAP_SERVERS = %[3]q
}
}
dynamic "source_access_configuration" {
for_each = aws_subnet.test[*].id
content {
type = "VPC_SUBNET"
uri = "subnet:${source_access_configuration.value}"
}
}
source_access_configuration {
type = "VPC_SECURITY_GROUP"
uri = aws_security_group.test.id
}
provisioned_poller_config {
maximum_pollers = %[4]s
minimum_pollers = %[5]s
}
}
`, rName, batchSize, kafkaBootstrapServers, maxPollers, minPollers))
}

func testAccEventSourceMappingConfig_dynamoDBBatchSize(rName, batchSize string) string {
if batchSize == "" {
batchSize = "null"
Expand Down
11 changes: 11 additions & 0 deletions website/docs/r/lambda_event_source_mapping.html.markdown
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,11 @@ resource "aws_lambda_event_source_mapping" "example" {
topics = ["Example"]
starting_position = "TRIM_HORIZON"
provisioned_poller_config {
maximum_poller = 80
minimum_poller = 10
}
self_managed_event_source {
endpoints = {
KAFKA_BOOTSTRAP_SERVERS = "kafka1.example.com:9092,kafka2.example.com:9092"
Expand Down Expand Up @@ -167,6 +172,7 @@ resource "aws_lambda_event_source_mapping" "example" {
* `maximum_retry_attempts`: - (Optional) The maximum number of times to retry when the function returns an error. Only available for stream sources (DynamoDB and Kinesis). Minimum and default of -1 (forever), maximum of 10000.
* `metrics_config`: - (Optional) CloudWatch metrics configuration of the event source. Only available for stream sources (DynamoDB and Kinesis) and SQS queues. Detailed below.
* `parallelization_factor`: - (Optional) The number of batches to process from each shard concurrently. Only available for stream sources (DynamoDB and Kinesis). Minimum and default of 1, maximum of 10.
* `provisioned_poller_config`: - (Optional) Event poller configuration for the event source. Only valid for Amazon MSK or self-managed Apache Kafka sources. Detailed below.
* `queues` - (Optional) The name of the Amazon MQ broker destination queue to consume. Only available for MQ sources. The list must contain exactly one queue name.
* `scaling_config` - (Optional) Scaling configuration of the event source. Only available for SQS queues. Detailed below.
* `self_managed_event_source`: - (Optional) For Self Managed Kafka sources, the location of the self managed cluster. If set, configuration must also include `source_access_configuration`. Detailed below.
Expand Down Expand Up @@ -208,6 +214,11 @@ resource "aws_lambda_event_source_mapping" "example" {

* `metrics` - (Required) A list containing the metrics to be produced by the event source mapping. Valid values: `EventCount`.

### provisioned_poller_config Configuration Block

* `maximum_pollers` - (Optional) The maximum number of event pollers this event source can scale up to. The range is between 1 and 2000.
* `minimum_pollers` - (Optional) The minimum number of event pollers this event source can scale down to. The range is between 1 and 200.

### scaling_config Configuration Block

* `maximum_concurrency` - (Optional) Limits the number of concurrent instances that the Amazon SQS event source can invoke. Must be greater than or equal to `2`. See [Configuring maximum concurrency for Amazon SQS event sources](https://docs.aws.amazon.com/lambda/latest/dg/with-sqs.html#events-sqs-max-concurrency). You need to raise a [Service Quota Ticket](https://docs.aws.amazon.com/general/latest/gr/aws_service_limits.html) to increase the concurrency beyond 1000.
Expand Down

0 comments on commit dd0feab

Please sign in to comment.