From 115f1161bb9051dbe31993720255ed997610993e Mon Sep 17 00:00:00 2001 From: Mathieu Gabelle <54168385+mgabelle@users.noreply.github.com> Date: Tue, 17 Dec 2024 12:24:23 +0100 Subject: [PATCH] refactor: move all plugin properties to dynamic properties (#566) * refactor: move Query and abstract connection to dynamic properties * refactor: move DynamoDB to dynamic properties * refactor: move Kinesis and eventbridge to dynamic properties * refactor: move Lambda to dynamic properties * refactor: move s3 to dynamic properties * refactor: move Sns to dynamic properties * refactor: move Sqs to dynamic properties --- .../kestra/plugin/aws/AbstractConnection.java | 20 +-- .../aws/AbstractConnectionInterface.java | 52 +++----- .../io/kestra/plugin/aws/athena/Query.java | 32 ++--- .../io/kestra/plugin/aws/auth/EksToken.java | 6 +- .../java/io/kestra/plugin/aws/cli/AwsCLI.java | 19 +-- .../plugin/aws/dynamodb/AbstractDynamoDb.java | 8 +- .../plugin/aws/dynamodb/DeleteItem.java | 11 +- .../kestra/plugin/aws/dynamodb/GetItem.java | 10 +- .../kestra/plugin/aws/dynamodb/PutItem.java | 2 +- .../io/kestra/plugin/aws/dynamodb/Query.java | 28 ++-- .../io/kestra/plugin/aws/dynamodb/Scan.java | 26 ++-- .../plugin/aws/eventbridge/PutEvents.java | 6 +- .../kestra/plugin/aws/kinesis/PutRecords.java | 24 ++-- .../io/kestra/plugin/aws/lambda/Invoke.java | 20 +-- .../io/kestra/plugin/aws/s3/AbstractS3.java | 2 +- .../plugin/aws/s3/AbstractS3Object.java | 5 +- .../aws/s3/AbstractS3ObjectInterface.java | 7 +- .../kestra/plugin/aws/s3/ActionInterface.java | 4 +- .../java/io/kestra/plugin/aws/s3/Copy.java | 29 ++--- .../io/kestra/plugin/aws/s3/CreateBucket.java | 45 +++---- .../java/io/kestra/plugin/aws/s3/Delete.java | 23 ++-- .../io/kestra/plugin/aws/s3/DeleteList.java | 30 ++--- .../io/kestra/plugin/aws/s3/Download.java | 18 ++- .../io/kestra/plugin/aws/s3/Downloads.java | 24 ++-- .../java/io/kestra/plugin/aws/s3/List.java | 23 ++-- .../kestra/plugin/aws/s3/ListInterface.java | 28 ++-- .../io/kestra/plugin/aws/s3/S3Service.java | 40 +++--- .../java/io/kestra/plugin/aws/s3/Trigger.java | 52 ++++---- .../java/io/kestra/plugin/aws/s3/Upload.java | 120 ++++++++---------- .../io/kestra/plugin/aws/sns/AbstractSns.java | 4 +- .../io/kestra/plugin/aws/sns/Publish.java | 2 +- .../io/kestra/plugin/aws/sqs/AbstractSqs.java | 3 +- .../io/kestra/plugin/aws/sqs/Consume.java | 29 ++--- .../io/kestra/plugin/aws/sqs/Publish.java | 2 +- .../plugin/aws/sqs/RealtimeTrigger.java | 56 ++++---- .../aws/sqs/SqsConnectionInterface.java | 4 +- .../io/kestra/plugin/aws/sqs/Trigger.java | 39 +++--- .../kestra/plugin/aws/athena/QueryTest.java | 12 +- .../io/kestra/plugin/aws/cli/AwsCLITest.java | 6 +- .../plugin/aws/dynamodb/DeleteItemTest.java | 10 +- .../plugin/aws/dynamodb/GetItemTest.java | 10 +- .../plugin/aws/dynamodb/PutItemTest.java | 16 +-- .../kestra/plugin/aws/dynamodb/QueryTest.java | 72 +++++------ .../kestra/plugin/aws/dynamodb/ScanTest.java | 66 +++++----- .../plugin/aws/ecr/GetAuthTokenTest.java | 6 +- .../plugin/aws/eventbridge/PutEventsTest.java | 24 ++-- .../plugin/aws/kinesis/PutRecordsTest.java | 24 ++-- .../kestra/plugin/aws/lambda/InvokeTest.java | 26 ++-- .../plugin/aws/lambda/InvokeUnitTest.java | 62 +++++---- .../io/kestra/plugin/aws/s3/AbstractTest.java | 28 ++-- .../java/io/kestra/plugin/aws/s3/AllTest.java | 30 ++--- .../io/kestra/plugin/aws/s3/CopyTest.java | 18 +-- .../kestra/plugin/aws/s3/DeleteListTest.java | 8 +- .../kestra/plugin/aws/s3/DownloadsTest.java | 26 ++-- .../io/kestra/plugin/aws/s3/ListTest.java | 9 +- .../io/kestra/plugin/aws/s3/TriggerTest.java | 5 +- .../io/kestra/plugin/aws/s3/UploadsTest.java | 18 +-- .../io/kestra/plugin/aws/sns/PublishTest.java | 8 +- .../plugin/aws/sqs/AbstractSqsTest.java | 1 + .../aws/sqs/PublishThenConsumeTest.java | 38 +++--- .../plugin/aws/sqs/RealtimeTriggerTest.java | 8 +- .../io/kestra/plugin/aws/sqs/TriggerTest.java | 8 +- 62 files changed, 674 insertions(+), 718 deletions(-) diff --git a/src/main/java/io/kestra/plugin/aws/AbstractConnection.java b/src/main/java/io/kestra/plugin/aws/AbstractConnection.java index b338207f..54af953b 100644 --- a/src/main/java/io/kestra/plugin/aws/AbstractConnection.java +++ b/src/main/java/io/kestra/plugin/aws/AbstractConnection.java @@ -16,21 +16,21 @@ public abstract class AbstractConnection extends Task implements AbstractConnectionInterface { protected Property region; - protected String endpointOverride; - protected Boolean compatibilityMode; + protected Property endpointOverride; + protected Property compatibilityMode; // Configuration for StaticCredentialsProvider - protected String accessKeyId; - protected String secretKeyId; - protected String sessionToken; + protected Property accessKeyId; + protected Property secretKeyId; + protected Property sessionToken; // Configuration for AWS STS AssumeRole - protected String stsRoleArn; - protected String stsRoleExternalId; - protected String stsRoleSessionName; - protected String stsEndpointOverride; + protected Property stsRoleArn; + protected Property stsRoleExternalId; + protected Property stsRoleSessionName; + protected Property stsEndpointOverride; @Builder.Default - protected Duration stsRoleSessionDuration = AbstractConnectionInterface.AWS_MIN_STS_ROLE_SESSION_DURATION; + protected Property stsRoleSessionDuration = Property.of(AbstractConnectionInterface.AWS_MIN_STS_ROLE_SESSION_DURATION); /** * Common AWS Client configuration properties. diff --git a/src/main/java/io/kestra/plugin/aws/AbstractConnectionInterface.java b/src/main/java/io/kestra/plugin/aws/AbstractConnectionInterface.java index 08e9a42c..7097d957 100644 --- a/src/main/java/io/kestra/plugin/aws/AbstractConnectionInterface.java +++ b/src/main/java/io/kestra/plugin/aws/AbstractConnectionInterface.java @@ -16,56 +16,48 @@ public interface AbstractConnectionInterface { title = "Access Key Id in order to connect to AWS.", description = "If no credentials are defined, we will use the [default credentials provider chain](https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/credentials-chain.html) to fetch credentials." ) - @PluginProperty(dynamic = true) - String getAccessKeyId(); + Property getAccessKeyId(); @Schema( title = "Secret Key Id in order to connect to AWS.", description = "If no credentials are defined, we will use the [default credentials provider chain](https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/credentials-chain.html) to fetch credentials." ) - @PluginProperty(dynamic = true) - String getSecretKeyId(); + Property getSecretKeyId(); @Schema( title = "AWS session token, retrieved from an AWS token service, used for authenticating that this user has received temporary permissions to access a given resource.", description = "If no credentials are defined, we will use the [default credentials provider chain](https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/credentials-chain.html) to fetch credentials." ) - @PluginProperty(dynamic = true) - String getSessionToken(); + Property getSessionToken(); @Schema( title = "AWS STS Role.", description = "The Amazon Resource Name (ARN) of the role to assume. If set the task will use the `StsAssumeRoleCredentialsProvider`. If no credentials are defined, we will use the [default credentials provider chain](https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/credentials-chain.html) to fetch credentials." ) - @PluginProperty(dynamic = true) - String getStsRoleArn(); + Property getStsRoleArn(); @Schema( title = "AWS STS External Id.", description = " A unique identifier that might be required when you assume a role in another account. This property is only used when an `stsRoleArn` is defined." ) - @PluginProperty(dynamic = true) - String getStsRoleExternalId(); + Property getStsRoleExternalId(); @Schema( title = "AWS STS Session name.", description = "This property is only used when an `stsRoleArn` is defined." ) - @PluginProperty(dynamic = true) - String getStsRoleSessionName(); + Property getStsRoleSessionName(); @Schema( title = "AWS STS Session duration.", description = "The duration of the role session (default: 15 minutes, i.e., PT15M). This property is only used when an `stsRoleArn` is defined." ) - @PluginProperty - Duration getStsRoleSessionDuration(); + Property getStsRoleSessionDuration(); @Schema( title = "The AWS STS endpoint with which the SDKClient should communicate." ) - @PluginProperty(dynamic = true) - String getStsEndpointOverride(); + Property getStsEndpointOverride(); @Schema( title = "AWS region with which the SDK should communicate." @@ -76,26 +68,24 @@ public interface AbstractConnectionInterface { title = "The endpoint with which the SDK should communicate.", description = "This property allows you to use a different S3 compatible storage backend." ) - @PluginProperty(dynamic = true) - String getEndpointOverride(); + Property getEndpointOverride(); - @PluginProperty(dynamic = true) - default Boolean getCompatibilityMode() { - return false; + default Property getCompatibilityMode() { + return Property.of(false); } default AbstractConnection.AwsClientConfig awsClientConfig(final RunContext runContext) throws IllegalVariableEvaluationException { return new AbstractConnection.AwsClientConfig( - runContext.render(this.getAccessKeyId()), - runContext.render(this.getSecretKeyId()), - runContext.render(this.getSessionToken()), - runContext.render(this.getStsRoleArn()), - runContext.render(this.getStsRoleExternalId()), - runContext.render(this.getStsRoleSessionName()), - runContext.render(this.getStsEndpointOverride()), - getStsRoleSessionDuration(), - this.getRegion() == null ? null : this.getRegion().as(runContext, String.class), - runContext.render(this.getEndpointOverride()) + runContext.render(this.getAccessKeyId()).as(String.class).orElse(null), + runContext.render(this.getSecretKeyId()).as(String.class).orElse(null), + runContext.render(this.getSessionToken()).as(String.class).orElse(null), + runContext.render(this.getStsRoleArn()).as(String.class).orElse(null), + runContext.render(this.getStsRoleExternalId()).as(String.class).orElse(null), + runContext.render(this.getStsRoleSessionName()).as(String.class).orElse(null), + runContext.render(this.getStsEndpointOverride()).as(String.class).orElse(null), + runContext.render(this.getStsRoleSessionDuration()).as(Duration.class).orElse(null), + runContext.render(this.getRegion()).as(String.class).orElse(null), + runContext.render(this.getEndpointOverride()).as(String.class).orElse(null) ); } } diff --git a/src/main/java/io/kestra/plugin/aws/athena/Query.java b/src/main/java/io/kestra/plugin/aws/athena/Query.java index 19fcb542..8f4ca720 100644 --- a/src/main/java/io/kestra/plugin/aws/athena/Query.java +++ b/src/main/java/io/kestra/plugin/aws/athena/Query.java @@ -5,6 +5,7 @@ import io.kestra.core.models.annotations.Plugin; import io.kestra.core.models.annotations.PluginProperty; import io.kestra.core.models.executions.metrics.Counter; +import io.kestra.core.models.property.Property; import io.kestra.core.models.tasks.Output; import io.kestra.core.models.tasks.RunnableTask; import io.kestra.core.models.tasks.common.FetchType; @@ -49,7 +50,7 @@ title = "Query an Athena table.", description = """ The query will wait for completion, except if fetchMode is set to `NONE`, and will output converted rows. - Row conversion is based on the types listed [here](https://docs.aws.amazon.com/athena/latest/ug/data-types.html). + Row conversion is based on the types listed [here](https://docs.aws.amazon.com/athena/latest/ug/data-types.html). Complex data types like array, map and struct will be converted to a string.""" ) @Plugin( @@ -78,26 +79,22 @@ ) public class Query extends AbstractConnection implements RunnableTask { @Schema(title = "Athena catalog.") - @PluginProperty(dynamic = true) - private String catalog; + private Property catalog; @Schema(title = "Athena database.") @NotNull - @PluginProperty(dynamic = true) - private String database; + private Property database; @Schema( title = "Athena output location.", description = "The query results will be stored in this output location. Must be an existing S3 bucket." ) @NotNull - @PluginProperty(dynamic = true) - private String outputLocation; + private Property outputLocation; @Schema(title = "Athena SQL query.") @NotNull - @PluginProperty(dynamic = true) - private String query; + private Property query; @Schema( title = "The way you want to store the data.", @@ -107,15 +104,13 @@ public class Query extends AbstractConnection implements RunnableTask fetchType = Property.of(FetchType.STORE); @Schema(title = "Whether to skip the first row which is usually the header.") @NotNull - @PluginProperty @Builder.Default - private boolean skipHeader = true; + private Property skipHeader = Property.of(true); private static DateTimeFormatter dateFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd"); @@ -125,23 +120,24 @@ public class Query extends AbstractConnection implements RunnableTask results = getQueryResultsResults.resultSet().rows(); - if (skipHeader && results != null && !results.isEmpty()) { + if (runContext.render(skipHeader).as(Boolean.class).orElseThrow() && results != null && !results.isEmpty()) { // we skip the first row, this is usually needed as by default Athena returns the header as the first row results = results.subList(1, results.size()); } diff --git a/src/main/java/io/kestra/plugin/aws/auth/EksToken.java b/src/main/java/io/kestra/plugin/aws/auth/EksToken.java index d898c62d..3e6cce56 100644 --- a/src/main/java/io/kestra/plugin/aws/auth/EksToken.java +++ b/src/main/java/io/kestra/plugin/aws/auth/EksToken.java @@ -70,18 +70,18 @@ public Output run(RunContext runContext) throws Exception { if(this.getRegion() == null) { throw new RuntimeException("Region is required"); } - final Region awsRegion = Region.of(this.getRegion().as(runContext, String.class)); + final Region awsRegion = Region.of(runContext.render(this.getRegion()).as(String.class).orElseThrow()); SdkHttpFullRequest requestToSign = SdkHttpFullRequest .builder() .method(SdkHttpMethod.GET) .uri(getStsRegionalEndpointUri(runContext, awsRegion)) - .appendHeader("x-k8s-aws-id", this.clusterName.as(runContext, String.class)) + .appendHeader("x-k8s-aws-id", runContext.render(this.clusterName).as(String.class).orElseThrow()) .appendRawQueryParameter("Action", "GetCallerIdentity") .appendRawQueryParameter("Version", "2011-06-15") .build(); - ZonedDateTime expirationDate = ZonedDateTime.now().plusSeconds(expirationDuration.as(runContext, Long.class)); + ZonedDateTime expirationDate = ZonedDateTime.now().plusSeconds(runContext.render(expirationDuration).as(Long.class).orElseThrow()); Aws4PresignerParams presignerParams = Aws4PresignerParams.builder() .awsCredentials(ConnectionUtils.credentialsProvider(this.awsClientConfig(runContext)).resolveCredentials()) .signingRegion(awsRegion) diff --git a/src/main/java/io/kestra/plugin/aws/cli/AwsCLI.java b/src/main/java/io/kestra/plugin/aws/cli/AwsCLI.java index fba20e28..b05e7342 100644 --- a/src/main/java/io/kestra/plugin/aws/cli/AwsCLI.java +++ b/src/main/java/io/kestra/plugin/aws/cli/AwsCLI.java @@ -20,6 +20,7 @@ import lombok.*; import lombok.experimental.SuperBuilder; +import java.time.Duration; import java.util.ArrayList; import java.util.HashMap; import java.util.List; @@ -148,19 +149,19 @@ public ScriptOutput run(RunContext runContext) throws Exception { // hack for missing env vars supports: https://github.com/aws/aws-cli/issues/5639 if (this.stsRoleArn != null) { - allCommands.add("aws configure set role_arn " + runContext.render(this.stsRoleArn)); + allCommands.add("aws configure set role_arn " + runContext.render(this.stsRoleArn).as(String.class).orElseThrow()); } if (this.stsRoleSessionName != null) { - allCommands.add("aws configure set role_session_name " + runContext.render(this.stsRoleSessionName)); + allCommands.add("aws configure set role_session_name " + runContext.render(this.stsRoleSessionName).as(String.class).orElseThrow()); } if (this.stsRoleExternalId != null) { - allCommands.add("aws configure set external_id " + runContext.render(this.stsRoleExternalId)); + allCommands.add("aws configure set external_id " + runContext.render(this.stsRoleExternalId).as(String.class).orElseThrow()); } if (this.stsRoleSessionDuration != null) { - allCommands.add("aws configure set duration_seconds " + stsRoleSessionDuration.getSeconds()); + allCommands.add("aws configure set duration_seconds " + runContext.render(stsRoleSessionDuration).as(Duration.class).orElseThrow().getSeconds()); } if (this.stsCredentialSource != null) { @@ -206,23 +207,23 @@ private Map getEnv(RunContext runContext) throws IllegalVariable Map envs = new HashMap<>(); if (this.accessKeyId != null) { - envs.put("AWS_ACCESS_KEY_ID", runContext.render(this.accessKeyId)); + envs.put("AWS_ACCESS_KEY_ID", runContext.render(this.accessKeyId).as(String.class).orElseThrow()); } if (this.secretKeyId != null) { - envs.put("AWS_SECRET_ACCESS_KEY", runContext.render(this.secretKeyId)); + envs.put("AWS_SECRET_ACCESS_KEY", runContext.render(this.secretKeyId).as(String.class).orElseThrow()); } if (this.region != null) { - envs.put("AWS_DEFAULT_REGION", this.region.as(runContext, String.class)); + envs.put("AWS_DEFAULT_REGION", runContext.render(this.region).as(String.class).orElseThrow()); } if (this.sessionToken != null) { - envs.put("AWS_SESSION_TOKEN", runContext.render(this.sessionToken)); + envs.put("AWS_SESSION_TOKEN", runContext.render(this.sessionToken).as(String.class).orElseThrow()); } if (this.endpointOverride != null) { - envs.put("AWS_ENDPOINT_URL", runContext.render(this.endpointOverride)); + envs.put("AWS_ENDPOINT_URL", runContext.render(this.endpointOverride).as(String.class).orElseThrow()); } envs.put("AWS_DEFAULT_OUTPUT", this.outputFormat.toString()); diff --git a/src/main/java/io/kestra/plugin/aws/dynamodb/AbstractDynamoDb.java b/src/main/java/io/kestra/plugin/aws/dynamodb/AbstractDynamoDb.java index 003f3934..7a4c2af0 100644 --- a/src/main/java/io/kestra/plugin/aws/dynamodb/AbstractDynamoDb.java +++ b/src/main/java/io/kestra/plugin/aws/dynamodb/AbstractDynamoDb.java @@ -3,6 +3,7 @@ import io.kestra.core.exceptions.IllegalVariableEvaluationException; import io.kestra.core.models.annotations.PluginProperty; import io.kestra.core.models.executions.metrics.Counter; +import io.kestra.core.models.property.Property; import io.kestra.core.models.tasks.common.FetchOutput; import io.kestra.core.models.tasks.common.FetchType; import io.kestra.core.runners.RunContext; @@ -40,9 +41,8 @@ @NoArgsConstructor public abstract class AbstractDynamoDb extends AbstractConnection { @Schema(title = "The DynamoDB table name.") - @PluginProperty(dynamic = true) @NotNull - protected String tableName; + protected Property tableName; protected DynamoDbClient client(final RunContext runContext) throws IllegalVariableEvaluationException { final AwsClientConfig clientConfig = awsClientConfig(runContext); @@ -109,7 +109,7 @@ protected AttributeValue objectFrom(Object value) { return AttributeValue.fromS(value.toString()); } - protected FetchOutput fetchOutputs(List> items, FetchType fetchType, RunContext runContext) throws IOException { + protected FetchOutput fetchOutputs(List> items, FetchType fetchType, RunContext runContext) throws IOException, IllegalVariableEvaluationException { var outputBuilder = FetchOutput.builder(); switch (fetchType) { case FETCH: @@ -139,7 +139,7 @@ protected FetchOutput fetchOutputs(List> items, Fetc runContext.metric(Counter.of( "records", output.getSize(), - "tableName", getTableName() + "tableName", runContext.render(getTableName()).as(String.class).orElseThrow() )); return output; diff --git a/src/main/java/io/kestra/plugin/aws/dynamodb/DeleteItem.java b/src/main/java/io/kestra/plugin/aws/dynamodb/DeleteItem.java index 32c4bb6c..99f0979d 100644 --- a/src/main/java/io/kestra/plugin/aws/dynamodb/DeleteItem.java +++ b/src/main/java/io/kestra/plugin/aws/dynamodb/DeleteItem.java @@ -3,6 +3,7 @@ import io.kestra.core.models.annotations.Example; import io.kestra.core.models.annotations.Plugin; import io.kestra.core.models.annotations.PluginProperty; +import io.kestra.core.models.property.Property; import io.kestra.core.models.tasks.RunnableTask; import io.kestra.core.models.tasks.VoidOutput; import io.kestra.core.runners.RunContext; @@ -41,7 +42,7 @@ secretKeyId: "" region: "eu-central-1" tableName: "persons" - key: + key: id: "1" """ ) @@ -52,16 +53,16 @@ public class DeleteItem extends AbstractDynamoDb implements RunnableTask key; + private Property> key; @Override public VoidOutput run(RunContext runContext) throws Exception { try (var dynamoDb = client(runContext)) { - Map key = valueMapFrom(getKey()); + var renderedKey = runContext.render(this.key).asMap(String.class, Object.class); + Map key = valueMapFrom(renderedKey); var deleteRequest = DeleteItemRequest.builder() - .tableName(runContext.render(this.getTableName())) + .tableName(runContext.render(this.getTableName()).as(String.class).orElseThrow()) .key(key) .build(); diff --git a/src/main/java/io/kestra/plugin/aws/dynamodb/GetItem.java b/src/main/java/io/kestra/plugin/aws/dynamodb/GetItem.java index aeb5cc18..b934b093 100644 --- a/src/main/java/io/kestra/plugin/aws/dynamodb/GetItem.java +++ b/src/main/java/io/kestra/plugin/aws/dynamodb/GetItem.java @@ -3,6 +3,7 @@ import io.kestra.core.models.annotations.Example; import io.kestra.core.models.annotations.Plugin; import io.kestra.core.models.annotations.PluginProperty; +import io.kestra.core.models.property.Property; import io.kestra.core.models.tasks.RunnableTask; import io.kestra.core.runners.RunContext; import io.swagger.v3.oas.annotations.media.Schema; @@ -37,7 +38,7 @@ secretKeyId: "" region: "eu-central-1" tableName: "persons" - key: + key: id: "1" """ ) @@ -48,16 +49,15 @@ public class GetItem extends AbstractDynamoDb implements RunnableTask key; + private Property> key; @Override public Output run(RunContext runContext) throws Exception { try (var dynamoDb = client(runContext)) { - Map key = valueMapFrom(runContext.render(this.key)); + Map key = valueMapFrom(runContext.render(this.key).asMap(String.class, Object.class)); var getRequest = GetItemRequest.builder() - .tableName(runContext.render(this.tableName)) + .tableName(runContext.render(this.tableName).as(String.class).orElseThrow()) .key(key) .build(); diff --git a/src/main/java/io/kestra/plugin/aws/dynamodb/PutItem.java b/src/main/java/io/kestra/plugin/aws/dynamodb/PutItem.java index 0b8cc806..cd57d926 100644 --- a/src/main/java/io/kestra/plugin/aws/dynamodb/PutItem.java +++ b/src/main/java/io/kestra/plugin/aws/dynamodb/PutItem.java @@ -84,7 +84,7 @@ public VoidOutput run(RunContext runContext) throws Exception { var item = valueMapFrom(fields); var putRequest = PutItemRequest.builder() - .tableName(runContext.render(this.tableName)) + .tableName(runContext.render(this.tableName).as(String.class).orElseThrow()) .item(item) .build(); dynamoDb.putItem(putRequest); diff --git a/src/main/java/io/kestra/plugin/aws/dynamodb/Query.java b/src/main/java/io/kestra/plugin/aws/dynamodb/Query.java index bed24836..29154bf5 100644 --- a/src/main/java/io/kestra/plugin/aws/dynamodb/Query.java +++ b/src/main/java/io/kestra/plugin/aws/dynamodb/Query.java @@ -3,6 +3,7 @@ import io.kestra.core.models.annotations.Example; import io.kestra.core.models.annotations.Plugin; import io.kestra.core.models.annotations.PluginProperty; +import io.kestra.core.models.property.Property; import io.kestra.core.models.tasks.RunnableTask; import io.kestra.core.models.tasks.common.FetchOutput; import io.kestra.core.models.tasks.common.FetchType; @@ -75,56 +76,51 @@ public class Query extends AbstractDynamoDb implements RunnableTask + "NONE do nothing." ) @Builder.Default - @PluginProperty - private FetchType fetchType = FetchType.STORE; + private Property fetchType = Property.of(FetchType.STORE); @Schema( title = "Maximum numbers of returned results." ) - @PluginProperty - private Integer limit; + private Property limit; @Schema( title = "Query key condition expression." ) - @PluginProperty(dynamic = true) @NotNull - private String keyConditionExpression; + private Property keyConditionExpression; @Schema( title = "Query expression attributes.", description = "It's a map of string -> object." ) - @PluginProperty(dynamic = true) @NotNull - private Map expressionAttributeValues; + private Property> expressionAttributeValues; @Schema( title = "Query filter expression.", description = "Query filter expression." ) - @PluginProperty(dynamic = true) - private String filterExpression; + private Property filterExpression; @Override public FetchOutput run(RunContext runContext) throws Exception { try (var dynamoDb = client(runContext)) { var queryBuilder = QueryRequest.builder() - .tableName(runContext.render(this.getTableName())) - .keyConditionExpression(runContext.render(keyConditionExpression)) - .expressionAttributeValues(valueMapFrom(expressionAttributeValues)); + .tableName(runContext.render(this.getTableName()).as(String.class).orElseThrow()) + .keyConditionExpression(runContext.render(keyConditionExpression).as(String.class).orElseThrow()) + .expressionAttributeValues(valueMapFrom(runContext.render(expressionAttributeValues).asMap(String.class, Object.class))); if(limit != null) { - queryBuilder.limit(limit); + queryBuilder.limit(runContext.render(limit).as(Integer.class).orElseThrow()); } if(filterExpression != null){ - queryBuilder.filterExpression(runContext.render(filterExpression)); + queryBuilder.filterExpression(runContext.render(filterExpression).as(String.class).orElseThrow()); } var query = queryBuilder.build(); var items = dynamoDb.query(query).items(); - return this.fetchOutputs(items, this.fetchType, runContext); + return this.fetchOutputs(items, runContext.render(this.fetchType).as(FetchType.class).orElseThrow(), runContext); } } } diff --git a/src/main/java/io/kestra/plugin/aws/dynamodb/Scan.java b/src/main/java/io/kestra/plugin/aws/dynamodb/Scan.java index 4e4e9e62..5c48c341 100644 --- a/src/main/java/io/kestra/plugin/aws/dynamodb/Scan.java +++ b/src/main/java/io/kestra/plugin/aws/dynamodb/Scan.java @@ -3,6 +3,7 @@ import io.kestra.core.models.annotations.Example; import io.kestra.core.models.annotations.Plugin; import io.kestra.core.models.annotations.PluginProperty; +import io.kestra.core.models.property.Property; import io.kestra.core.models.tasks.RunnableTask; import io.kestra.core.models.tasks.common.FetchOutput; import io.kestra.core.models.tasks.common.FetchType; @@ -71,52 +72,49 @@ public class Scan extends AbstractDynamoDb implements RunnableTask + "NONE do nothing." ) @Builder.Default - @PluginProperty - private FetchType fetchType = FetchType.STORE; + private Property fetchType = Property.of(FetchType.STORE); @Schema( title = "Maximum numbers of returned results." ) - @PluginProperty - private Integer limit; + private Property limit; @Schema( title = "Scan filter expression.", description = "When used, `expressionAttributeValues` property must also be provided." ) - @PluginProperty(dynamic = true) - private String filterExpression; + private Property filterExpression; @Schema( title = "Scan expression attributes.", description = "It's a map of string -> object." ) - @PluginProperty(dynamic = true) - private Map expressionAttributeValues; + private Property> expressionAttributeValues; @Override public FetchOutput run(RunContext runContext) throws Exception { try (var dynamoDb = client(runContext)) { var scanBuilder = ScanRequest.builder() - .tableName(runContext.render(this.getTableName())); + .tableName(runContext.render(this.getTableName()).as(String.class).orElseThrow()); if(limit != null) { - scanBuilder.limit(limit); + scanBuilder.limit(runContext.render(limit).as(Integer.class).orElseThrow()); } if(filterExpression != null){ - if(expressionAttributeValues == null){ + var attributes = runContext.render(expressionAttributeValues).asMap(String.class, Object.class); + if(attributes.isEmpty()){ throw new IllegalArgumentException("'expressionAttributeValues' must be provided when 'filterExpression' is used"); } - scanBuilder.filterExpression(runContext.render(filterExpression)); - scanBuilder.expressionAttributeValues(valueMapFrom(expressionAttributeValues)); + scanBuilder.filterExpression(runContext.render(filterExpression).as(String.class).orElseThrow()); + scanBuilder.expressionAttributeValues(valueMapFrom(attributes)); } var scan = scanBuilder.build(); var items = dynamoDb.scan(scan).items(); - return this.fetchOutputs(items, this.fetchType, runContext); + return this.fetchOutputs(items, runContext.render(this.fetchType).as(FetchType.class).orElseThrow(), runContext); } } } diff --git a/src/main/java/io/kestra/plugin/aws/eventbridge/PutEvents.java b/src/main/java/io/kestra/plugin/aws/eventbridge/PutEvents.java index 2708a4cc..1e00f0be 100644 --- a/src/main/java/io/kestra/plugin/aws/eventbridge/PutEvents.java +++ b/src/main/java/io/kestra/plugin/aws/eventbridge/PutEvents.java @@ -10,6 +10,7 @@ import io.kestra.core.models.executions.metrics.Counter; import io.kestra.core.models.executions.metrics.Timer; import io.kestra.core.models.flows.State; +import io.kestra.core.models.property.Property; import io.kestra.core.models.tasks.RunnableTask; import io.kestra.core.runners.RunContext; import io.kestra.core.serializers.FileSerde; @@ -96,14 +97,13 @@ public class PutEvents extends AbstractConnection implements RunnableTask failOnUnsuccessfulEvents = Property.of(true); @PluginProperty(dynamic = true) @NotNull @@ -129,7 +129,7 @@ public PutEvents.Output run(RunContext runContext) throws Exception { runContext.metric(Counter.of("entryCount", entryList.size())); // Fail if failOnUnsuccessfulEvents - if (failOnUnsuccessfulEvents && putEventsResponse.failedEntryCount() > 0) { + if (runContext.render(failOnUnsuccessfulEvents).as(Boolean.class).orElseThrow() && putEventsResponse.failedEntryCount() > 0) { var logger = runContext.logger(); logger.error("Response show {} event failed: {}", putEventsResponse.failedEntryCount(), putEventsResponse); throw new RuntimeException(String.format("Response show %d event failed: %s", putEventsResponse.failedEntryCount(), putEventsResponse)); diff --git a/src/main/java/io/kestra/plugin/aws/kinesis/PutRecords.java b/src/main/java/io/kestra/plugin/aws/kinesis/PutRecords.java index b6346515..e082a645 100644 --- a/src/main/java/io/kestra/plugin/aws/kinesis/PutRecords.java +++ b/src/main/java/io/kestra/plugin/aws/kinesis/PutRecords.java @@ -11,6 +11,7 @@ import io.kestra.core.models.executions.metrics.Counter; import io.kestra.core.models.executions.metrics.Timer; import io.kestra.core.models.flows.State; +import io.kestra.core.models.property.Property; import io.kestra.core.models.tasks.RunnableTask; import io.kestra.core.runners.RunContext; import io.kestra.core.serializers.FileSerde; @@ -82,7 +83,7 @@ secretKeyId: "" region: "eu-central-1" streamName: "mystream" - records: kestra:///myfile.ion + records: kestra:///myfile.ion """ ) } @@ -94,28 +95,25 @@ public class PutRecords extends AbstractConnection implements RunnableTask failOnUnsuccessfulRecords = Property.of(true); - @PluginProperty(dynamic = true) @Schema( title = "The name of the stream to push the records.", description = "Make sure to set either `streamName` or `streamArn`. One of those must be provided." ) - private String streamName; + private Property streamName; - @PluginProperty(dynamic = true) @Schema( title = "The ARN of the stream to push the records.", description = "Make sure to set either `streamName` or `streamArn`. One of those must be provided." ) - private String streamArn; + private Property streamArn; @PluginProperty(dynamic = true) @Schema( @@ -135,7 +133,7 @@ public Output run(RunContext runContext) throws Exception { PutRecordsResponse putRecordsResponse = putRecords(runContext, records); // Fail if failOnUnsuccessfulRecords - if (failOnUnsuccessfulRecords && putRecordsResponse.failedRecordCount() > 0) { + if (runContext.render(failOnUnsuccessfulRecords).as(Boolean.class).orElseThrow() && putRecordsResponse.failedRecordCount() > 0) { var logger = runContext.logger(); logger.error("Response show {} record failed: {}", putRecordsResponse.failedRecordCount(), putRecordsResponse); throw new RuntimeException(String.format("Response show %d record failed: %s", putRecordsResponse.failedRecordCount(), putRecordsResponse)); @@ -159,10 +157,12 @@ private PutRecordsResponse putRecords(RunContext runContext, List record try (KinesisClient client = client(runContext)) { PutRecordsRequest.Builder builder = PutRecordsRequest.builder(); - if (!Strings.isNullOrEmpty(streamArn)) { - builder.streamARN(streamArn); - } else if (!Strings.isNullOrEmpty(streamName)) { - builder.streamName(streamName); + var renderedStreamArn = runContext.render(streamArn).as(String.class).orElse(null); + var renderedStreamName = runContext.render(streamName).as(String.class).orElse(null); + if (!Strings.isNullOrEmpty(renderedStreamArn)) { + builder.streamARN(renderedStreamArn); + } else if (!Strings.isNullOrEmpty(renderedStreamName)) { + builder.streamName(renderedStreamName); } else { throw new IllegalArgumentException("Either streamName or streamArn has to be set."); } diff --git a/src/main/java/io/kestra/plugin/aws/lambda/Invoke.java b/src/main/java/io/kestra/plugin/aws/lambda/Invoke.java index bbb5b569..fc4f40fb 100644 --- a/src/main/java/io/kestra/plugin/aws/lambda/Invoke.java +++ b/src/main/java/io/kestra/plugin/aws/lambda/Invoke.java @@ -10,6 +10,7 @@ import io.kestra.core.models.annotations.PluginProperty; import io.kestra.core.models.executions.metrics.Counter; import io.kestra.core.models.executions.metrics.Timer; +import io.kestra.core.models.property.Property; import io.kestra.core.models.tasks.RunnableTask; import io.kestra.core.runners.RunContext; import io.kestra.core.serializers.JacksonMapper; @@ -98,22 +99,23 @@ public class Invoke extends AbstractConnection implements RunnableTask { private static final ObjectMapper OBJECT_MAPPER = JacksonMapper.ofJson(); @Schema(title = "The Lambda function name.") - @PluginProperty(dynamic = true) @NotNull - private String functionArn; + private Property functionArn; @Schema( title = "Function request payload.", description = "Request payload. It's a map of string -> object." - ) - @PluginProperty(dynamic = true) - private Map functionPayload; + ) + private Property> functionPayload; @Override public Output run(RunContext runContext) throws Exception { final long start = System.nanoTime(); - var functionArn = runContext.render(this.functionArn); - var requestPayload = this.functionPayload != null ? runContext.render(this.functionPayload) : null; + var functionArn = runContext.render(this.functionArn).as(String.class).orElseThrow(); + var requestPayload = runContext.render(this.functionPayload).asMap(String.class, Object.class).isEmpty() ? + null : + runContext.render(this.functionPayload).asMap(String.class, Object.class); + try (var lambda = client(runContext)) { var builder = InvokeRequest.builder().functionName(functionArn); if (requestPayload != null && requestPayload.size() > 0) { @@ -205,7 +207,7 @@ void handleError(String functionArn, ContentType contentType, SdkBytes payload) log.debug("Lambda function error for {}: response type: {}, response payload: {}", functionArn, contentType, errorPayload); } - if (errorPayload != null + if (errorPayload != null && ContentType.APPLICATION_JSON.getMimeType().equals(contentType.getMimeType())) { throw new LambdaInvokeException( "Lambda Invoke task responded with error for function: " + functionArn @@ -229,7 +231,7 @@ Output handleContent(RunContext runContext, String functionArn, ContentType cont runContext.metric(Counter.of("file.size", size)); var uri = runContext.storage().putFile(tempFile); if (log.isDebugEnabled()) { - log.debug("Lambda invokation task completed {}: response type: {}, file: `{}", + log.debug("Lambda invokation task completed {}: response type: {}, file: `{}", functionArn, contentType, uri); } return Output.builder() diff --git a/src/main/java/io/kestra/plugin/aws/s3/AbstractS3.java b/src/main/java/io/kestra/plugin/aws/s3/AbstractS3.java index 0f7cb07e..4f62fb52 100644 --- a/src/main/java/io/kestra/plugin/aws/s3/AbstractS3.java +++ b/src/main/java/io/kestra/plugin/aws/s3/AbstractS3.java @@ -20,7 +20,7 @@ default S3Client client(final RunContext runContext) throws IllegalVariableEvalu default S3AsyncClient asyncClient(final RunContext runContext) throws IllegalVariableEvaluationException { final AbstractConnection.AwsClientConfig clientConfig = awsClientConfig(runContext); - if (this.getCompatibilityMode()) { + if (runContext.render(this.getCompatibilityMode()).as(Boolean.class).orElse(false)) { return ConnectionUtils.configureAsyncClient(clientConfig, S3AsyncClient.builder()).build(); } else { S3CrtAsyncClientBuilder s3ClientBuilder = S3AsyncClient.crtBuilder() diff --git a/src/main/java/io/kestra/plugin/aws/s3/AbstractS3Object.java b/src/main/java/io/kestra/plugin/aws/s3/AbstractS3Object.java index 94d5eedc..ebef4dfa 100644 --- a/src/main/java/io/kestra/plugin/aws/s3/AbstractS3Object.java +++ b/src/main/java/io/kestra/plugin/aws/s3/AbstractS3Object.java @@ -1,5 +1,6 @@ package io.kestra.plugin.aws.s3; +import io.kestra.core.models.property.Property; import io.kestra.plugin.aws.AbstractConnection; import lombok.EqualsAndHashCode; import lombok.Getter; @@ -13,9 +14,9 @@ @Getter @NoArgsConstructor public abstract class AbstractS3Object extends AbstractConnection implements AbstractS3ObjectInterface { - protected String requestPayer; + protected Property requestPayer; - protected String bucket; + protected Property bucket; static { // Initializing CRT will download the S3 native library into /tmp. diff --git a/src/main/java/io/kestra/plugin/aws/s3/AbstractS3ObjectInterface.java b/src/main/java/io/kestra/plugin/aws/s3/AbstractS3ObjectInterface.java index 0dbfcda6..76b19b62 100644 --- a/src/main/java/io/kestra/plugin/aws/s3/AbstractS3ObjectInterface.java +++ b/src/main/java/io/kestra/plugin/aws/s3/AbstractS3ObjectInterface.java @@ -1,6 +1,7 @@ package io.kestra.plugin.aws.s3; import io.kestra.core.models.annotations.PluginProperty; +import io.kestra.core.models.property.Property; import io.swagger.v3.oas.annotations.media.Schema; import jakarta.validation.constraints.NotNull; @@ -8,13 +9,11 @@ public interface AbstractS3ObjectInterface extends AbstractS3 { @Schema( title = "The S3 bucket name." ) - @PluginProperty(dynamic = true) @NotNull - String getBucket(); + Property getBucket(); @Schema( title = "Sets the value of the RequestPayer property for this object." ) - @PluginProperty(dynamic = true) - String getRequestPayer(); + Property getRequestPayer(); } diff --git a/src/main/java/io/kestra/plugin/aws/s3/ActionInterface.java b/src/main/java/io/kestra/plugin/aws/s3/ActionInterface.java index 9d7190ea..e32259e8 100644 --- a/src/main/java/io/kestra/plugin/aws/s3/ActionInterface.java +++ b/src/main/java/io/kestra/plugin/aws/s3/ActionInterface.java @@ -1,6 +1,7 @@ package io.kestra.plugin.aws.s3; import io.kestra.core.models.annotations.PluginProperty; +import io.kestra.core.models.property.Property; import io.swagger.v3.oas.annotations.media.Schema; import jakarta.validation.constraints.NotNull; @@ -9,9 +10,8 @@ public interface ActionInterface { @Schema( title = "The action to perform on the retrieved files. If using 'NONE' make sure to handle the files inside your flow to avoid infinite triggering." ) - @PluginProperty(dynamic = true) @NotNull - ActionInterface.Action getAction(); + Property getAction(); @Schema( title = "The destination bucket and key for `MOVE` action." diff --git a/src/main/java/io/kestra/plugin/aws/s3/Copy.java b/src/main/java/io/kestra/plugin/aws/s3/Copy.java index fc438395..433cc882 100644 --- a/src/main/java/io/kestra/plugin/aws/s3/Copy.java +++ b/src/main/java/io/kestra/plugin/aws/s3/Copy.java @@ -3,6 +3,7 @@ import io.kestra.core.models.annotations.Example; import io.kestra.core.models.annotations.Plugin; import io.kestra.core.models.annotations.PluginProperty; +import io.kestra.core.models.property.Property; import io.kestra.core.models.tasks.RunnableTask; import io.kestra.core.runners.RunContext; import io.kestra.plugin.aws.AbstractConnection; @@ -62,27 +63,26 @@ public class Copy extends AbstractConnection implements AbstractS3, RunnableTask @Schema( title = "Whether to delete the source file after download." ) - @PluginProperty @Builder.Default - private Boolean delete = false; + private Property delete = Property.of(false); @Override public Output run(RunContext runContext) throws Exception { try (S3Client client = this.client(runContext)) { CopyObjectRequest.Builder builder = CopyObjectRequest.builder() - .sourceBucket(runContext.render(this.from.bucket)) - .sourceKey(runContext.render(this.from.key)) - .destinationBucket(runContext.render(this.to.bucket != null ? this.to.bucket : this.from.bucket)) - .destinationKey(runContext.render(this.to.key != null ? this.to.key : this.from.key)); + .sourceBucket(runContext.render(this.from.bucket).as(String.class).orElseThrow()) + .sourceKey(runContext.render(this.from.key).as(String.class).orElseThrow()) + .destinationBucket(runContext.render(this.to.bucket != null ? this.to.bucket : this.from.bucket).as(String.class).orElseThrow()) + .destinationKey(runContext.render(this.to.key != null ? this.to.key : this.from.key).as(String.class).orElseThrow()); if (this.from.versionId != null) { - builder.sourceVersionId(runContext.render(this.from.versionId)); + builder.sourceVersionId(runContext.render(this.from.versionId).as(String.class).orElseThrow()); } CopyObjectRequest request = builder.build(); CopyObjectResponse response = client.copyObject(request); - if (this.delete) { + if (runContext.render(this.delete).as(Boolean.class).orElseThrow()) { Delete.builder() .id(this.id) .type(Delete.class.getName()) @@ -96,8 +96,8 @@ public Output run(RunContext runContext) throws Exception { .stsRoleSessionDuration(this.stsRoleSessionDuration) .stsRoleArn(this.stsRoleArn) .stsEndpointOverride(this.stsEndpointOverride) - .bucket(request.sourceBucket()) - .key(request.sourceKey()) + .bucket(Property.of(request.sourceBucket())) + .key(Property.of(request.sourceKey())) .build() .run(runContext); } @@ -118,16 +118,14 @@ public static class CopyObject { @Schema( title = "The bucket name" ) - @PluginProperty(dynamic = true) @NotNull - String bucket; + Property bucket; @Schema( title = "The bucket key" ) - @PluginProperty(dynamic = true) @NotNull - String key; + Property key; } @SuperBuilder(toBuilder = true) @@ -137,8 +135,7 @@ public static class CopyObjectFrom extends CopyObject { @Schema( title = "The specific version of the object." ) - @PluginProperty(dynamic = true) - private String versionId; + private Property versionId; } @SuperBuilder diff --git a/src/main/java/io/kestra/plugin/aws/s3/CreateBucket.java b/src/main/java/io/kestra/plugin/aws/s3/CreateBucket.java index 31bc0ee6..5c75f5cb 100644 --- a/src/main/java/io/kestra/plugin/aws/s3/CreateBucket.java +++ b/src/main/java/io/kestra/plugin/aws/s3/CreateBucket.java @@ -3,6 +3,7 @@ import io.kestra.core.models.annotations.Example; import io.kestra.core.models.annotations.Plugin; import io.kestra.core.models.annotations.PluginProperty; +import io.kestra.core.models.property.Property; import io.kestra.core.models.tasks.RunnableTask; import io.kestra.core.runners.RunContext; import io.kestra.plugin.aws.AbstractConnection; @@ -46,91 +47,79 @@ public class CreateBucket extends AbstractConnection implements AbstractS3, Runn @Schema( description = "The S3 bucket name to create." ) - @PluginProperty(dynamic = true) @NotNull - private String bucket; + private Property bucket; @Schema( description = "Allows grantee the read, write, read ACP, and write ACP permissions on the bucket." ) - @PluginProperty(dynamic = true) - private String grantFullControl; + private Property grantFullControl; @Schema( title = "Allows grantee to list the objects in the bucket." ) - @PluginProperty(dynamic = true) - private String grantRead; + private Property grantRead; @Schema( title = "Allows grantee to list the ACL for the applicable bucket." ) - @PluginProperty(dynamic = true) - private String grantReadACP; + private Property grantReadACP; @Schema( title = "Allows grantee to create, overwrite, and delete any object in the bucket." ) - @PluginProperty(dynamic = true) - private String grantWrite; + private Property grantWrite; @Schema( title = "Allows grantee to write the ACL for the applicable bucket." ) - @PluginProperty(dynamic = true) - private String grantWriteACP; + private Property grantWriteACP; @Schema( title = "The canned ACL to apply to the bucket." ) - @PluginProperty(dynamic = true) - private String acl; + private Property acl; @Schema( title = "Specifies whether you want S3 Object Lock to be enabled for the new bucket." ) - @PluginProperty - private Boolean objectLockEnabledForBucket; + private Property objectLockEnabledForBucket; @Override public Output run(RunContext runContext) throws Exception { - String bucket = runContext.render(this.bucket); + String bucket = runContext.render(this.bucket).as(String.class).orElseThrow(); try (S3Client client = this.client(runContext)) { CreateBucketRequest.Builder builder = CreateBucketRequest.builder() .bucket(bucket); if (grantFullControl != null) { - builder.grantFullControl(runContext.render(this.grantFullControl)); + builder.grantFullControl(runContext.render(this.grantFullControl).as(String.class).orElseThrow()); } if (grantRead != null) { - builder.grantRead(runContext.render(this.grantRead)); + builder.grantRead(runContext.render(this.grantRead).as(String.class).orElseThrow()); } if (grantReadACP != null) { - builder.grantReadACP(runContext.render(this.grantReadACP)); + builder.grantReadACP(runContext.render(this.grantReadACP).as(String.class).orElseThrow()); } if (grantWrite != null) { - builder.grantWrite(runContext.render(this.grantWrite)); + builder.grantWrite(runContext.render(this.grantWrite).as(String.class).orElseThrow()); } if (grantWriteACP != null) { - builder.grantWriteACP(runContext.render(this.grantWriteACP)); + builder.grantWriteACP(runContext.render(this.grantWriteACP).as(String.class).orElseThrow()); } if (acl != null) { - builder.acl(runContext.render(this.acl)); + builder.acl(runContext.render(this.acl).as(String.class).orElseThrow()); } if (objectLockEnabledForBucket != null) { - builder.objectLockEnabledForBucket(this.objectLockEnabledForBucket); - } - - if (objectLockEnabledForBucket != null) { - builder.objectLockEnabledForBucket(this.objectLockEnabledForBucket); + builder.objectLockEnabledForBucket(runContext.render(this.objectLockEnabledForBucket).as(Boolean.class).orElseThrow()); } CreateBucketResponse response = client.createBucket(builder.build()); diff --git a/src/main/java/io/kestra/plugin/aws/s3/Delete.java b/src/main/java/io/kestra/plugin/aws/s3/Delete.java index a5b0ba07..6b341b4d 100644 --- a/src/main/java/io/kestra/plugin/aws/s3/Delete.java +++ b/src/main/java/io/kestra/plugin/aws/s3/Delete.java @@ -3,6 +3,7 @@ import io.kestra.core.models.annotations.Example; import io.kestra.core.models.annotations.Plugin; import io.kestra.core.models.annotations.PluginProperty; +import io.kestra.core.models.property.Property; import io.kestra.core.models.tasks.RunnableTask; import io.kestra.core.runners.RunContext; import io.swagger.v3.oas.annotations.media.Schema; @@ -45,15 +46,13 @@ public class Delete extends AbstractS3Object implements RunnableTask key; @Schema( title = "Indicates whether S3 Object Lock should bypass Governance-mode restrictions to process this operation." ) - @PluginProperty - private Boolean bypassGovernanceRetention; + private Property bypassGovernanceRetention; @Schema( title = "The concatenation of the authentication device's serial number, a space, and the value that is displayed on " + @@ -61,19 +60,17 @@ public class Delete extends AbstractS3Object implements RunnableTask mfa; @Schema( description = "Sets the value of the RequestPayer property for this object." ) - @PluginProperty(dynamic = true) - private String requestPayer; + private Property requestPayer; @Override public Output run(RunContext runContext) throws Exception { - String bucket = runContext.render(this.bucket); - String key = runContext.render(this.key); + String bucket = runContext.render(this.bucket).as(String.class).orElseThrow(); + String key = runContext.render(this.key).as(String.class).orElseThrow(); try (S3Client client = client(runContext)) { DeleteObjectRequest.Builder builder = DeleteObjectRequest.builder() @@ -81,15 +78,15 @@ public Output run(RunContext runContext) throws Exception { .key(key); if (this.bypassGovernanceRetention != null) { - builder.bypassGovernanceRetention(this.bypassGovernanceRetention); + builder.bypassGovernanceRetention(runContext.render(this.bypassGovernanceRetention).as(Boolean.class).orElseThrow()); } if (this.mfa != null) { - builder.mfa(runContext.render(this.mfa)); + builder.mfa(runContext.render(this.mfa).as(String.class).orElseThrow()); } if (this.requestPayer != null) { - builder.requestPayer(runContext.render(this.requestPayer)); + builder.requestPayer(runContext.render(this.requestPayer).as(String.class).orElseThrow()); } DeleteObjectResponse response = client.deleteObject(builder.build()); diff --git a/src/main/java/io/kestra/plugin/aws/s3/DeleteList.java b/src/main/java/io/kestra/plugin/aws/s3/DeleteList.java index 5874f698..b6544d55 100644 --- a/src/main/java/io/kestra/plugin/aws/s3/DeleteList.java +++ b/src/main/java/io/kestra/plugin/aws/s3/DeleteList.java @@ -4,6 +4,7 @@ import io.kestra.core.models.annotations.Plugin; import io.kestra.core.models.annotations.PluginProperty; import io.kestra.core.models.executions.metrics.Counter; +import io.kestra.core.models.property.Property; import io.kestra.core.models.tasks.RunnableTask; import io.kestra.core.runners.RunContext; import io.kestra.plugin.aws.s3.models.S3Object; @@ -54,23 +55,23 @@ title = "Delete a list of keys on a S3 bucket." ) public class DeleteList extends AbstractS3Object implements RunnableTask, ListInterface { - private String prefix; + private Property prefix; - private String delimiter; + private Property delimiter; - private String marker; + private Property marker; - private String encodingType; + private Property encodingType; @Builder.Default - private Integer maxKeys = 1000; + private Property maxKeys = Property.of(1000); - private String expectedBucketOwner; + private Property expectedBucketOwner; - protected String regexp; + protected Property regexp; @Builder.Default - protected final Filter filter = Filter.BOTH; + protected final Property filter = Property.of(Filter.BOTH); @Min(2) @Schema( @@ -82,14 +83,13 @@ public class DeleteList extends AbstractS3Object implements RunnableTask errorOnEmpty = Property.of(false); @Override public Output run(RunContext runContext) throws Exception { Logger logger = runContext.logger(); - String bucket = runContext.render(this.bucket); + String bucket = runContext.render(this.bucket).as(String.class).orElseThrow(); try (S3Client client = this.client(runContext)) { @@ -122,11 +122,11 @@ public Output run(RunContext runContext) throws Exception { runContext.metric(Counter.of("count", finalResult.getLeft())); runContext.metric(Counter.of("size", finalResult.getRight())); - if (errorOnEmpty && finalResult.getLeft() == 0) { + if (runContext.render(errorOnEmpty).as(Boolean.class).orElseThrow() && finalResult.getLeft() == 0) { throw new NoSuchElementException("Unable to find any files to delete on " + - runContext.render(this.bucket) + " " + - "with regexp='" + runContext.render(this.regexp) + "', " + - "prefix='" + runContext.render(this.prefix) + "'" + runContext.render(this.bucket).as(String.class).orElseThrow() + " " + + "with regexp='" + runContext.render(this.regexp).as(String.class).orElse(null) + "', " + + "prefix='" + runContext.render(this.prefix).as(String.class).orElse(null) + "'" ); } diff --git a/src/main/java/io/kestra/plugin/aws/s3/Download.java b/src/main/java/io/kestra/plugin/aws/s3/Download.java index 01e006dd..89ebf240 100644 --- a/src/main/java/io/kestra/plugin/aws/s3/Download.java +++ b/src/main/java/io/kestra/plugin/aws/s3/Download.java @@ -3,6 +3,7 @@ import io.kestra.core.models.annotations.Example; import io.kestra.core.models.annotations.Plugin; import io.kestra.core.models.annotations.PluginProperty; +import io.kestra.core.models.property.Property; import io.kestra.core.models.tasks.RunnableTask; import io.kestra.core.runners.RunContext; import io.swagger.v3.oas.annotations.media.Schema; @@ -50,27 +51,24 @@ public class Download extends AbstractS3Object implements RunnableTask key; @Schema( title = "The specific version of the object." ) - @PluginProperty(dynamic = true) - protected String versionId; + protected Property versionId; @Schema( title = "If set to true, the task will use the AWS S3 DefaultAsyncClient instead of the S3CrtAsyncClient, which better integrates with S3-compatible services but restricts uploads and downloads to 2GB." ) - @PluginProperty @Builder.Default - private Boolean compatibilityMode = false; + private Property compatibilityMode = Property.of(false); @Override public Output run(RunContext runContext) throws Exception { - String bucket = runContext.render(this.bucket); - String key = runContext.render(this.key); + String bucket = runContext.render(this.bucket).as(String.class).orElseThrow(); + String key = runContext.render(this.key).as(String.class).orElseThrow(); try (S3AsyncClient client = this.asyncClient(runContext)) { GetObjectRequest.Builder builder = GetObjectRequest.builder() @@ -78,11 +76,11 @@ public Output run(RunContext runContext) throws Exception { .key(key); if (this.versionId != null) { - builder.versionId(runContext.render(this.versionId)); + builder.versionId(runContext.render(this.versionId).as(String.class).orElseThrow()); } if (this.requestPayer != null) { - builder.requestPayer(runContext.render(this.requestPayer)); + builder.requestPayer(runContext.render(this.requestPayer).as(String.class).orElseThrow()); } Pair download = S3Service.download(runContext, client, builder.build()); diff --git a/src/main/java/io/kestra/plugin/aws/s3/Downloads.java b/src/main/java/io/kestra/plugin/aws/s3/Downloads.java index 2af7d907..848a84a7 100644 --- a/src/main/java/io/kestra/plugin/aws/s3/Downloads.java +++ b/src/main/java/io/kestra/plugin/aws/s3/Downloads.java @@ -4,6 +4,7 @@ import io.kestra.core.models.annotations.Example; import io.kestra.core.models.annotations.Plugin; import io.kestra.core.models.annotations.PluginProperty; +import io.kestra.core.models.property.Property; import io.kestra.core.models.tasks.RunnableTask; import io.kestra.core.runners.RunContext; import io.kestra.plugin.aws.s3.models.S3Object; @@ -51,33 +52,32 @@ title = "Downloads multiple files from a S3 bucket." ) public class Downloads extends AbstractS3Object implements RunnableTask, ListInterface, ActionInterface { - private String prefix; + private Property prefix; - private String delimiter; + private Property delimiter; - private String marker; + private Property marker; - private String encodingType; + private Property encodingType; @Builder.Default - private Integer maxKeys = 1000; + private Property maxKeys = Property.of(1000); @Schema( title = "This property will use the AWS S3 DefaultAsyncClient instead of the S3CrtAsyncClient, which maximizes compatibility with S3-compatible services but restricts uploads and downloads to 2GB." ) - @PluginProperty @Builder.Default - private Boolean compatibilityMode = false; + private Property compatibilityMode = Property.of(false); - private String expectedBucketOwner; + private Property expectedBucketOwner; - protected String regexp; + protected Property regexp; @Builder.Default - protected final Filter filter = Filter.BOTH; + protected final Property filter = Property.of(Filter.BOTH); - private ActionInterface.Action action; + private Property action; private Copy.CopyObject moveTo; @@ -115,7 +115,7 @@ public Output run(RunContext runContext) throws Exception { .stream() .map(throwFunction(object -> { GetObjectRequest.Builder builder = GetObjectRequest.builder() - .bucket(runContext.render(bucket)) + .bucket(runContext.render(bucket).as(String.class).orElseThrow()) .key(object.getKey()); Pair download = S3Service.download(runContext, client, builder.build()); diff --git a/src/main/java/io/kestra/plugin/aws/s3/List.java b/src/main/java/io/kestra/plugin/aws/s3/List.java index 3dc8f473..228aa974 100644 --- a/src/main/java/io/kestra/plugin/aws/s3/List.java +++ b/src/main/java/io/kestra/plugin/aws/s3/List.java @@ -4,6 +4,7 @@ import io.kestra.core.models.annotations.Example; import io.kestra.core.models.annotations.Plugin; import io.kestra.core.models.executions.metrics.Counter; +import io.kestra.core.models.property.Property; import io.kestra.core.models.tasks.RunnableTask; import io.kestra.core.runners.RunContext; import io.kestra.plugin.aws.s3.models.S3Object; @@ -41,23 +42,23 @@ title = "List keys on a S3 bucket." ) public class List extends AbstractS3Object implements RunnableTask, ListInterface { - private String prefix; + private Property prefix; - private String delimiter; + private Property delimiter; - private String marker; + private Property marker; - private String encodingType; + private Property encodingType; @Builder.Default - private Integer maxKeys = 1000; + private Property maxKeys = Property.of(1000); - private String expectedBucketOwner; + private Property expectedBucketOwner; - protected String regexp; + protected Property regexp; @Builder.Default - protected final Filter filter = Filter.BOTH; + protected final Property filter = Property.of(Filter.BOTH); @Override public Output run(RunContext runContext) throws Exception { @@ -69,9 +70,9 @@ public Output run(RunContext runContext) throws Exception { runContext.logger().debug( "Found '{}' keys on {} with regexp='{}', prefix={}", list.size(), - runContext.render(bucket), - runContext.render(regexp), - runContext.render(prefix) + runContext.render(bucket).as(String.class).orElseThrow(), + runContext.render(regexp).as(String.class).orElse(null), + runContext.render(prefix).as(String.class).orElse(null) ); return Output.builder() diff --git a/src/main/java/io/kestra/plugin/aws/s3/ListInterface.java b/src/main/java/io/kestra/plugin/aws/s3/ListInterface.java index b23f961b..4a3e3662 100644 --- a/src/main/java/io/kestra/plugin/aws/s3/ListInterface.java +++ b/src/main/java/io/kestra/plugin/aws/s3/ListInterface.java @@ -1,6 +1,7 @@ package io.kestra.plugin.aws.s3; import io.kestra.core.models.annotations.PluginProperty; +import io.kestra.core.models.property.Property; import io.swagger.v3.oas.annotations.media.Schema; import jakarta.validation.constraints.NotNull; @@ -9,48 +10,41 @@ public interface ListInterface { @Schema( title = "The S3 bucket where to download the file." ) - @PluginProperty(dynamic = true) @NotNull - String getBucket(); + Property getBucket(); @Schema( title = "Limits the response to keys that begin with the specified prefix." ) - @PluginProperty(dynamic = true) - String getPrefix(); + Property getPrefix(); @Schema( title = "A delimiter is a character you use to group keys." ) - @PluginProperty(dynamic = true) - String getDelimiter(); + Property getDelimiter(); @Schema( title = "Marker is where you want Amazon S3 to start listing from.", description = "Amazon S3 starts listing after this specified key. Marker can be any key in the bucket." ) - @PluginProperty(dynamic = true) - String getMarker(); + Property getMarker(); @Schema( title = "The EncodingType property for this object." ) - @PluginProperty(dynamic = true) - String getEncodingType(); + Property getEncodingType(); @Schema( title = "Sets the maximum number of keys returned in the response.", description = "By default, the action returns up to 1,000 key names. The response might contain fewer keys but will never contain more." ) - @PluginProperty(dynamic = true) - Integer getMaxKeys(); + Property getMaxKeys(); @Schema( title = "The account ID of the expected bucket owner.", description = "If the bucket is owned by a different account, the request fails with the HTTP status code 403 Forbidden (access denied)." ) - @PluginProperty(dynamic = true) - String getExpectedBucketOwner(); + Property getExpectedBucketOwner(); @Schema( title = "A regexp to filter on full key.", @@ -58,14 +52,12 @@ public interface ListInterface { "`regExp: .*` to match all files\n"+ "`regExp: .*2020-01-0.\\\\.csv` to match files between 01 and 09 of january ending with `.csv`" ) - @PluginProperty(dynamic = true) - String getRegexp(); + Property getRegexp(); @Schema( title = "The type of objects to filter: files, directory, or both." ) - @PluginProperty - Filter getFilter(); + Property getFilter(); enum Filter { diff --git a/src/main/java/io/kestra/plugin/aws/s3/S3Service.java b/src/main/java/io/kestra/plugin/aws/s3/S3Service.java index f0fb05d4..637014e3 100644 --- a/src/main/java/io/kestra/plugin/aws/s3/S3Service.java +++ b/src/main/java/io/kestra/plugin/aws/s3/S3Service.java @@ -2,6 +2,7 @@ import io.kestra.core.exceptions.IllegalVariableEvaluationException; import io.kestra.core.models.executions.metrics.Counter; +import io.kestra.core.models.property.Property; import io.kestra.core.runners.RunContext; import io.kestra.core.utils.FileUtils; import io.kestra.plugin.aws.AbstractConnectionInterface; @@ -27,6 +28,8 @@ import java.util.concurrent.ExecutionException; import java.util.stream.Collectors; +import static io.kestra.core.utils.Rethrow.throwPredicate; + public class S3Service { public static void initCrt() { @@ -58,14 +61,15 @@ public static Pair download(RunContext runContext, S3Asy static void performAction( java.util.List s3Objects, - ActionInterface.Action action, + Property action, Copy.CopyObject moveTo, RunContext runContext, AbstractS3ObjectInterface abstractS3Object, AbstractConnectionInterface abstractS3, AbstractConnectionInterface abstractConnection ) throws Exception { - if (action == ActionInterface.Action.DELETE) { + var renderedAction = runContext.render(action).as(ActionInterface.Action.class).orElseThrow(); + if (renderedAction == ActionInterface.Action.DELETE) { for (S3Object object : s3Objects) { Delete delete = Delete.builder() .id("archive") @@ -74,7 +78,7 @@ static void performAction( .endpointOverride(abstractS3.getEndpointOverride()) .accessKeyId(abstractConnection.getAccessKeyId()) .secretKeyId(abstractConnection.getSecretKeyId()) - .key(object.getKey()) + .key(Property.of(object.getKey())) .bucket(abstractS3Object.getBucket()) .stsRoleArn(abstractConnection.getStsRoleArn()) .stsRoleExternalId(abstractConnection.getStsRoleExternalId()) @@ -84,7 +88,7 @@ static void performAction( .build(); delete.run(runContext); } - } else if (action == ActionInterface.Action.MOVE) { + } else if (renderedAction == ActionInterface.Action.MOVE) { for (S3Object object : s3Objects) { Copy copy = Copy.builder() .id("archive") @@ -100,16 +104,16 @@ static void performAction( .stsEndpointOverride(abstractConnection.getStsEndpointOverride()) .from(Copy.CopyObjectFrom.builder() .bucket(abstractS3Object.getBucket()) - .key(object.getKey()) + .key(Property.of(object.getKey())) .build() ) .to(moveTo.toBuilder() - .key(StringUtils.stripEnd(moveTo.getKey() + "/", "/") + .key(Property.of(StringUtils.stripEnd(moveTo.getKey() + "/", "/") + "/" + FilenameUtils.getName(object.getKey()) - ) + )) .build() ) - .delete(true) + .delete(Property.of(true)) .build(); copy.run(runContext); } @@ -118,41 +122,41 @@ static void performAction( public static List list(RunContext runContext, S3Client client, ListInterface list, AbstractS3Object abstractS3) throws IllegalVariableEvaluationException { ListObjectsRequest.Builder builder = ListObjectsRequest.builder() - .bucket(runContext.render(list.getBucket())) - .maxKeys(list.getMaxKeys()); + .bucket(runContext.render(list.getBucket()).as(String.class).orElseThrow()) + .maxKeys(runContext.render(list.getMaxKeys()).as(Integer.class).orElse(1000)); if (list.getPrefix() != null) { - builder.prefix(runContext.render(list.getPrefix())); + builder.prefix(runContext.render(list.getPrefix()).as(String.class).orElseThrow()); } if (list.getDelimiter() != null) { - builder.delimiter(runContext.render(list.getDelimiter())); + builder.delimiter(runContext.render(list.getDelimiter()).as(String.class).orElseThrow()); } if (list.getMarker() != null) { - builder.marker(runContext.render(list.getMarker())); + builder.marker(runContext.render(list.getMarker()).as(String.class).orElseThrow()); } if (list.getEncodingType() != null) { - builder.encodingType(runContext.render(list.getEncodingType())); + builder.encodingType(runContext.render(list.getEncodingType()).as(String.class).orElseThrow()); } if (list.getExpectedBucketOwner() != null) { - builder.expectedBucketOwner(runContext.render(list.getExpectedBucketOwner())); + builder.expectedBucketOwner(runContext.render(list.getExpectedBucketOwner()).as(String.class).orElseThrow()); } if (abstractS3.getRequestPayer() != null) { - builder.requestPayer(runContext.render(abstractS3.getRequestPayer())); + builder.requestPayer(runContext.render(abstractS3.getRequestPayer()).as(String.class).orElseThrow()); } - String regExp = runContext.render(list.getRegexp()); + String regExp = runContext.render(list.getRegexp()).as(String.class).orElse(null); ListObjectsResponse listObjectsResponse = client.listObjects(builder.build()); return listObjectsResponse .contents() .stream() - .filter(s3Object -> S3Service.filter(s3Object, regExp, list.getFilter())) + .filter(throwPredicate(s3Object -> S3Service.filter(s3Object, regExp, runContext.render(list.getFilter()).as(ListInterface.Filter.class).orElseThrow()))) .map(S3Object::of) .collect(Collectors.toList()); } diff --git a/src/main/java/io/kestra/plugin/aws/s3/Trigger.java b/src/main/java/io/kestra/plugin/aws/s3/Trigger.java index 06e13bdf..b238ba4b 100644 --- a/src/main/java/io/kestra/plugin/aws/s3/Trigger.java +++ b/src/main/java/io/kestra/plugin/aws/s3/Trigger.java @@ -40,7 +40,7 @@ code = """ id: s3_listen namespace: company.team - + tasks: - id: each type: io.kestra.plugin.core.flow.ForEach @@ -49,7 +49,7 @@ - id: return type: io.kestra.plugin.core.debug.Return format: "{{ taskrun.value }}" - + triggers: - id: watch type: io.kestra.plugin.aws.s3.Trigger @@ -71,7 +71,7 @@ code = """ id: s3_listen namespace: company.team - + tasks: - id: each type: io.kestra.plugin.core.flow.ForEach @@ -80,7 +80,7 @@ - id: return type: io.kestra.plugin.core.debug.Return format: "{{ taskrun.value }}" - + - id: delete type: io.kestra.plugin.aws.s3.Delete accessKeyId: "" @@ -88,7 +88,7 @@ region: "eu-central-1" bucket: "my-bucket" key: "{{ taskrun.value }}" - + triggers: - id: watch type: io.kestra.plugin.aws.s3.Trigger @@ -107,49 +107,49 @@ public class Trigger extends AbstractTrigger implements PollingTriggerInterface, @Builder.Default private final Duration interval = Duration.ofSeconds(60); - protected String accessKeyId; + protected Property accessKeyId; - protected String secretKeyId; + protected Property secretKeyId; - protected String sessionToken; + protected Property sessionToken; protected Property region; - protected String endpointOverride; + protected Property endpointOverride; - protected String requestPayer; + protected Property requestPayer; - protected String bucket; + protected Property bucket; - private String prefix; + private Property prefix; - private String delimiter; + private Property delimiter; - private String marker; + private Property marker; - private String encodingType; + private Property encodingType; @Builder.Default - private Integer maxKeys = 1000; + private Property maxKeys = Property.of(1000); - private String expectedBucketOwner; + private Property expectedBucketOwner; - protected String regexp; + protected Property regexp; @Builder.Default - protected final Filter filter = Filter.BOTH; + protected final Property filter = Property.of(Filter.BOTH); - private ActionInterface.Action action; + private Property action; private Copy.CopyObject moveTo; // Configuration for AWS STS AssumeRole - protected String stsRoleArn; - protected String stsRoleExternalId; - protected String stsRoleSessionName; - protected String stsEndpointOverride; + protected Property stsRoleArn; + protected Property stsRoleExternalId; + protected Property stsRoleSessionName; + protected Property stsEndpointOverride; @Builder.Default - protected Duration stsRoleSessionDuration = AbstractConnectionInterface.AWS_MIN_STS_ROLE_SESSION_DURATION; + protected Property stsRoleSessionDuration = Property.of(AbstractConnectionInterface.AWS_MIN_STS_ROLE_SESSION_DURATION); @Override public Optional evaluate(ConditionContext conditionContext, TriggerContext context) throws Exception { @@ -204,7 +204,7 @@ public Optional evaluate(ConditionContext conditionContext, TriggerCo .stsEndpointOverride(this.stsEndpointOverride) .requestPayer(this.requestPayer) .bucket(this.bucket) - .key(object.getKey()) + .key(Property.of(object.getKey())) .build(); Download.Output downloadOutput = download.run(runContext); diff --git a/src/main/java/io/kestra/plugin/aws/s3/Upload.java b/src/main/java/io/kestra/plugin/aws/s3/Upload.java index 5e1e8045..72b8b5bd 100644 --- a/src/main/java/io/kestra/plugin/aws/s3/Upload.java +++ b/src/main/java/io/kestra/plugin/aws/s3/Upload.java @@ -4,6 +4,7 @@ import io.kestra.core.models.annotations.Plugin; import io.kestra.core.models.annotations.PluginProperty; import io.kestra.core.models.executions.metrics.Counter; +import io.kestra.core.models.property.Property; import io.kestra.core.models.tasks.RunnableTask; import io.kestra.core.runners.RunContext; import io.kestra.core.serializers.JacksonMapper; @@ -45,7 +46,7 @@ inputs: - id: myfile type: FILE - + tasks: - id: upload type: io.kestra.plugin.aws.s3.Upload @@ -76,142 +77,121 @@ public class Upload extends AbstractS3Object implements RunnableTask key; @Schema( title = "A map of metadata to store with the object in S3." ) - @PluginProperty(dynamic = true) - private Map metadata; + private Property> metadata; @Schema( title = "Can be used to specify caching behavior along the request/response chain." ) - @PluginProperty(dynamic = true) - private String cacheControl; + private Property cacheControl; @Schema( title = "A standard MIME type describing the format of the contents." ) - @PluginProperty(dynamic = true) - private String contentType; + private Property contentType; @Schema( title = "Specifies what content encodings have been applied to the object.", description = "And thus, what decoding mechanisms must be applied to obtain the media-type referenced by the Content-Type header field." ) - @PluginProperty(dynamic = true) - private String contentEncoding; + private Property contentEncoding; @Schema( title = "Specifies presentational information for the object." ) - @PluginProperty(dynamic = true) - private String contentDisposition; + private Property contentDisposition; @Schema( title = "The language the content is in." ) - @PluginProperty(dynamic = true) - private String contentLanguage; + private Property contentLanguage; @Schema( title = "The size of the body in bytes.", description = "This parameter is useful when the size of the body cannot be determined automatically." ) - @PluginProperty - private Long contentLength; + private Property contentLength; @Schema( title = "The date and time after which the object is no longer cacheable." ) - @PluginProperty(dynamic = true) - private String expires; + private Property expires; @Schema( title = "The canned ACL to apply to the object." ) - @PluginProperty(dynamic = true) - private String acl; + private Property acl; @Schema( title = "If you don't specify, S3 Standard is the default storage class. Amazon S3 supports other storage classes." ) - @PluginProperty - private StorageClass storageClass; + private Property storageClass; @Schema( title = "The server-side encryption algorithm used when storing this object in Amazon S3.", description = "For example, AES256, aws:kms, aws:kms:dsse" ) - @PluginProperty - private ServerSideEncryption serverSideEncryption; + private Property serverSideEncryption; @Schema( title = "Specifies whether Amazon S3 should use an S3 Bucket Key for object encryption with server-side encryption using Key Management Service (KMS) keys (SSE-KMS).", description = "Setting this header to true causes Amazon S3 to use an S3 Bucket Key for object encryption with SSE-KMS." ) - @PluginProperty - private Boolean bucketKeyEnabled; + private Property bucketKeyEnabled; @Schema( title = "Indicates the algorithm used to create the checksum for the object when using the SDK." ) - @PluginProperty - private ChecksumAlgorithm checksumAlgorithm; + private Property checksumAlgorithm; @Schema( title = "The account ID of the expected bucket owner.", description = "If the bucket is owned by a different account, the request fails " + "with the HTTP status code `403 Forbidden` (access denied)." ) - @PluginProperty(dynamic = true) - private String expectedBucketOwner; + private Property expectedBucketOwner; @Schema( title = "The Object Lock mode that you want to apply to this object." ) - @PluginProperty - private ObjectLockMode objectLockMode; + private Property objectLockMode; @Schema( title = "Specifies whether a legal hold will be applied to this object." ) - @PluginProperty - private ObjectLockLegalHoldStatus objectLockLegalHoldStatus; + private Property objectLockLegalHoldStatus; @Schema( title = "The date and time when you want this object's Object Lock to expire. " ) - @PluginProperty(dynamic = true) - private String objectLockRetainUntilDate; + private Property objectLockRetainUntilDate; @Schema( title = "The checksum data integrity check to verify that the data received is the same data that was originally sent.", description = "Must be used in pair with `checksumAlgorithm` to defined the expect algorithm of these values" ) - @PluginProperty(dynamic = true) - private String checksum; + private Property checksum; @Schema( title = "The tag-set for the object." ) - @PluginProperty - private Map tagging; + private Property> tagging; @Schema( title = "This property will use the AWS S3 DefaultAsyncClient instead of the S3CrtAsyncClient, which maximizes compatibility with S3-compatible services but restricts uploads and downloads to 2GB. For some S3 endpoints such as CloudFlare R2, you may need to set this value to `true`." ) - @PluginProperty @Builder.Default - private Boolean compatibilityMode = false; + private Property compatibilityMode = Property.of(false); @Override public Output run(RunContext runContext) throws Exception { - String bucket = runContext.render(this.bucket); - String key = runContext.render(this.key); + String bucket = runContext.render(this.bucket).as(String.class).orElseThrow(); + String key = runContext.render(this.key).as(String.class).orElseThrow(); try (S3AsyncClient client = this.asyncClient(runContext)) { PutObjectRequest.Builder builder = PutObjectRequest @@ -220,86 +200,88 @@ public Output run(RunContext runContext) throws Exception { .key(key); if (this.requestPayer != null) { - builder.requestPayer(runContext.render(this.requestPayer)); + builder.requestPayer(runContext.render(this.requestPayer).as(String.class).orElseThrow()); } if (this.metadata != null) { - builder.metadata(runContext.renderMap(this.metadata)); + builder.metadata(runContext.render(this.metadata).asMap(String.class, String.class)); } if (this.cacheControl != null) { - builder.cacheControl(runContext.render(this.cacheControl)); + builder.cacheControl(runContext.render(this.cacheControl).as(String.class).orElseThrow()); } if (this.contentType != null) { - builder.contentType(runContext.render(this.contentType)); + builder.contentType(runContext.render(this.contentType).as(String.class).orElseThrow()); } if (this.contentEncoding != null) { - builder.contentEncoding(runContext.render(this.contentEncoding)); + builder.contentEncoding(runContext.render(this.contentEncoding).as(String.class).orElseThrow()); } if (this.contentDisposition != null) { - builder.contentDisposition(runContext.render(this.contentDisposition)); + builder.contentDisposition(runContext.render(this.contentDisposition).as(String.class).orElseThrow()); } if (this.contentLanguage != null) { - builder.contentLanguage(runContext.render(this.contentLanguage)); + builder.contentLanguage(runContext.render(this.contentLanguage).as(String.class).orElseThrow()); } if (this.contentLength != null) { - builder.contentLength(this.contentLength); + builder.contentLength(runContext.render(this.contentLength).as(Long.class).orElseThrow()); } if (this.expires != null) { - builder.expires(Instant.parse(runContext.render(this.expires))); + builder.expires(Instant.parse(runContext.render(this.expires).as(String.class).orElseThrow())); } if (this.acl != null) { - builder.acl(runContext.render(this.acl)); + builder.acl(runContext.render(this.acl).as(String.class).orElseThrow()); } if (this.storageClass != null) { - builder.storageClass(this.storageClass); + builder.storageClass(runContext.render(this.storageClass).as(StorageClass.class).orElseThrow()); } if (this.serverSideEncryption != null) { - builder.serverSideEncryption(this.serverSideEncryption); + builder.serverSideEncryption(runContext.render(this.serverSideEncryption).as(ServerSideEncryption.class).orElseThrow()); } if (this.bucketKeyEnabled != null) { - builder.bucketKeyEnabled(this.bucketKeyEnabled); + builder.bucketKeyEnabled(runContext.render(this.bucketKeyEnabled).as(Boolean.class).orElseThrow()); } if (this.checksumAlgorithm != null) { - builder.checksumAlgorithm(this.checksumAlgorithm); - switch (this.checksumAlgorithm) { - case SHA1 -> builder.checksumSHA1(runContext.render(this.checksum)); - case SHA256 -> builder.checksumSHA256(runContext.render(this.checksum)); - case CRC32 -> builder.checksumCRC32(runContext.render(this.checksum)); - case CRC32_C -> builder.checksumCRC32C(runContext.render(this.checksum)); + var renderedAlgorithm = runContext.render(this.checksumAlgorithm).as(ChecksumAlgorithm.class).orElseThrow(); + var sum = runContext.render(this.checksum).as(String.class).orElse(null); + builder.checksumAlgorithm(renderedAlgorithm); + switch (renderedAlgorithm) { + case SHA1 -> builder.checksumSHA1(sum); + case SHA256 -> builder.checksumSHA256(sum); + case CRC32 -> builder.checksumCRC32(sum); + case CRC32_C -> builder.checksumCRC32C(sum); } } if (this.expectedBucketOwner != null) { - builder.expectedBucketOwner(runContext.render(this.expectedBucketOwner)); + builder.expectedBucketOwner(runContext.render(this.expectedBucketOwner).as(String.class).orElseThrow()); } if (this.objectLockMode != null) { - builder.objectLockMode(this.objectLockMode); + builder.objectLockMode(runContext.render(this.objectLockMode).as(ObjectLockMode.class).orElseThrow()); } if (this.objectLockLegalHoldStatus != null) { - builder.objectLockLegalHoldStatus(this.objectLockLegalHoldStatus); + builder.objectLockLegalHoldStatus(runContext.render(this.objectLockLegalHoldStatus).as(ObjectLockLegalHoldStatus.class).orElseThrow()); } if (this.objectLockRetainUntilDate != null) { - builder.objectLockRetainUntilDate(Instant.parse(runContext.render(this.objectLockRetainUntilDate))); + builder.objectLockRetainUntilDate(Instant.parse(runContext.render(this.objectLockRetainUntilDate).as(String.class).orElseThrow())); } if (this.tagging != null) { builder.tagging(Tagging.builder() - .tagSet(runContext.renderMap(this.tagging) + .tagSet(runContext.render(this.tagging).asMap(String.class, String.class) .entrySet() .stream() .map(e -> Tag.builder() diff --git a/src/main/java/io/kestra/plugin/aws/sns/AbstractSns.java b/src/main/java/io/kestra/plugin/aws/sns/AbstractSns.java index ee559f76..6614f1cc 100644 --- a/src/main/java/io/kestra/plugin/aws/sns/AbstractSns.java +++ b/src/main/java/io/kestra/plugin/aws/sns/AbstractSns.java @@ -2,6 +2,7 @@ import io.kestra.core.exceptions.IllegalVariableEvaluationException; import io.kestra.core.models.annotations.PluginProperty; +import io.kestra.core.models.property.Property; import io.kestra.core.runners.RunContext; import io.kestra.plugin.aws.AbstractConnection; import io.kestra.plugin.aws.ConnectionUtils; @@ -21,9 +22,8 @@ @NoArgsConstructor abstract class AbstractSns extends AbstractConnection { @Schema(title = "The SNS topic ARN. The topic must already exist.") - @PluginProperty(dynamic = true) @NotNull - private String topicArn; + private Property topicArn; protected SnsClient client(final RunContext runContext) throws IllegalVariableEvaluationException { final AwsClientConfig clientConfig = awsClientConfig(runContext); diff --git a/src/main/java/io/kestra/plugin/aws/sns/Publish.java b/src/main/java/io/kestra/plugin/aws/sns/Publish.java index cf8b958f..950a667d 100644 --- a/src/main/java/io/kestra/plugin/aws/sns/Publish.java +++ b/src/main/java/io/kestra/plugin/aws/sns/Publish.java @@ -69,7 +69,7 @@ public class Publish extends AbstractSns implements RunnableTask @SuppressWarnings("unchecked") @Override public Publish.Output run(RunContext runContext) throws Exception { - var topicArn = runContext.render(getTopicArn()); + var topicArn = runContext.render(getTopicArn()).as(String.class).orElseThrow(); try (var snsClient = this.client(runContext)) { Integer count; Flux flowable; diff --git a/src/main/java/io/kestra/plugin/aws/sqs/AbstractSqs.java b/src/main/java/io/kestra/plugin/aws/sqs/AbstractSqs.java index 8200585a..e09240eb 100644 --- a/src/main/java/io/kestra/plugin/aws/sqs/AbstractSqs.java +++ b/src/main/java/io/kestra/plugin/aws/sqs/AbstractSqs.java @@ -1,6 +1,7 @@ package io.kestra.plugin.aws.sqs; import io.kestra.core.exceptions.IllegalVariableEvaluationException; +import io.kestra.core.models.property.Property; import io.kestra.core.runners.RunContext; import io.kestra.plugin.aws.AbstractConnection; import io.kestra.plugin.aws.ConnectionUtils; @@ -26,7 +27,7 @@ abstract class AbstractSqs extends AbstractConnection implements SqsConnectionIn private static final Duration RETRY_STRATEGY_BACKOFF_BASE_DELAY = Duration.ofMillis(50); private static final Duration RETRY_STRATEGY_BACKOFF_MAX_DELAY = Duration.ofMillis(300); - private String queueUrl; + private Property queueUrl; protected SqsClient client(final RunContext runContext) throws IllegalVariableEvaluationException { final AwsClientConfig clientConfig = awsClientConfig(runContext); diff --git a/src/main/java/io/kestra/plugin/aws/sqs/Consume.java b/src/main/java/io/kestra/plugin/aws/sqs/Consume.java index 9e243e8c..004a39ab 100644 --- a/src/main/java/io/kestra/plugin/aws/sqs/Consume.java +++ b/src/main/java/io/kestra/plugin/aws/sqs/Consume.java @@ -1,20 +1,18 @@ package io.kestra.plugin.aws.sqs; +import io.kestra.core.exceptions.IllegalVariableEvaluationException; import io.kestra.core.models.annotations.Example; import io.kestra.core.models.annotations.Plugin; -import io.kestra.core.models.annotations.PluginProperty; import io.kestra.core.models.executions.metrics.Counter; +import io.kestra.core.models.property.Property; import io.kestra.core.models.tasks.RunnableTask; import io.kestra.core.runners.RunContext; import io.kestra.core.serializers.FileSerde; -import io.kestra.plugin.aws.sqs.model.Message; import io.kestra.plugin.aws.sqs.model.SerdeType; import io.swagger.v3.oas.annotations.media.Schema; import jakarta.validation.constraints.NotNull; import lombok.*; import lombok.experimental.SuperBuilder; -import reactor.core.publisher.Flux; -import software.amazon.awssdk.services.sqs.SqsAsyncClient; import software.amazon.awssdk.services.sqs.model.DeleteMessageRequest; import software.amazon.awssdk.services.sqs.model.ReceiveMessageRequest; @@ -57,24 +55,21 @@ ) public class Consume extends AbstractSqs implements RunnableTask { - @PluginProperty @Schema(title = "Maximum number of records; when reached, the task will end.") - private Integer maxRecords; + private Property maxRecords; - @PluginProperty @Schema(title = "Maximum duration in the Duration ISO format, after that the task will end.") - private Duration maxDuration; + private Property maxDuration; @Builder.Default - @PluginProperty @NotNull @Schema(title = "The serializer/deserializer to use.") - private SerdeType serdeType = SerdeType.STRING; + private Property serdeType = Property.of(SerdeType.STRING); @SuppressWarnings("BusyWait") @Override public Output run(RunContext runContext) throws Exception { - var queueUrl = runContext.render(getQueueUrl()); + var queueUrl = runContext.render(getQueueUrl()).as(String.class).orElseThrow(); if (this.maxDuration == null && this.maxRecords == null) { throw new IllegalArgumentException("'maxDuration' or 'maxRecords' must be set to avoid an infinite loop"); } @@ -92,7 +87,7 @@ public Output run(RunContext runContext) throws Exception { .build(); var msg = sqsClient.receiveMessage(receiveRequest); msg.messages().forEach(throwConsumer(m -> { - FileSerde.write(outputFile, serdeType.deserialize(m.body())); + FileSerde.write(outputFile, runContext.render(serdeType).as(SerdeType.class).orElseThrow().deserialize(m.body())); sqsClient.deleteMessage(DeleteMessageRequest.builder() .queueUrl(queueUrl) .receiptHandle(m.receiptHandle()).build() @@ -101,7 +96,7 @@ public Output run(RunContext runContext) throws Exception { })); Thread.sleep(100); - } while (!this.ended(total, started)); + } while (!this.ended(total, started, runContext)); runContext.metric(Counter.of("records", total.get(), "queue", queueUrl)); outputFile.flush(); @@ -114,12 +109,14 @@ public Output run(RunContext runContext) throws Exception { } } - private boolean ended(AtomicInteger count, ZonedDateTime start) { - if (this.maxRecords != null && count.get() >= this.maxRecords) { + private boolean ended(AtomicInteger count, ZonedDateTime start, RunContext runContext) throws IllegalVariableEvaluationException { + var max = runContext.render(this.maxRecords).as(Integer.class); + if (max.isPresent() && count.get() >= max.get()) { return true; } - if (this.maxDuration != null && ZonedDateTime.now().toEpochSecond() > start.plus(this.maxDuration).toEpochSecond()) { + var duration = runContext.render(this.maxDuration).as(Duration.class); + if (duration.isPresent() && ZonedDateTime.now().toEpochSecond() > start.plus(duration.get()).toEpochSecond()) { return true; } diff --git a/src/main/java/io/kestra/plugin/aws/sqs/Publish.java b/src/main/java/io/kestra/plugin/aws/sqs/Publish.java index c5a22ed5..a077209e 100644 --- a/src/main/java/io/kestra/plugin/aws/sqs/Publish.java +++ b/src/main/java/io/kestra/plugin/aws/sqs/Publish.java @@ -72,7 +72,7 @@ public class Publish extends AbstractSqs implements RunnableTask @SuppressWarnings("unchecked") @Override public Output run(RunContext runContext) throws Exception { - var queueUrl = runContext.render(getQueueUrl()); + var queueUrl = runContext.render(getQueueUrl()).as(String.class).orElseThrow(); try (var sqsClient = this.client(runContext)) { Integer count; Flux flowable; diff --git a/src/main/java/io/kestra/plugin/aws/sqs/RealtimeTrigger.java b/src/main/java/io/kestra/plugin/aws/sqs/RealtimeTrigger.java index 724925c5..1a522b46 100644 --- a/src/main/java/io/kestra/plugin/aws/sqs/RealtimeTrigger.java +++ b/src/main/java/io/kestra/plugin/aws/sqs/RealtimeTrigger.java @@ -54,58 +54,54 @@ accessKeyId: "access_key" secretKeyId: "secret_key" region: "eu-central-1" - queueUrl: https://sqs.eu-central-1.amazonaws.com/000000000000/test-queue""" + queueUrl: https://sqs.eu-central-1.amazonaws.com/000000000000/test-queue""" ) } ) public class RealtimeTrigger extends AbstractTrigger implements RealtimeTriggerInterface, TriggerOutput, SqsConnectionInterface { - private String queueUrl; + private Property queueUrl; - private String accessKeyId; + private Property accessKeyId; - private String secretKeyId; + private Property secretKeyId; - private String sessionToken; + private Property sessionToken; private Property region; - private String endpointOverride; + private Property endpointOverride; @Builder.Default - @PluginProperty @NotNull @Schema(title = "The serializer/deserializer to use.") - private SerdeType serdeType = SerdeType.STRING; + private Property serdeType = Property.of(SerdeType.STRING); // Configuration for AWS STS AssumeRole - protected String stsRoleArn; - protected String stsRoleExternalId; - protected String stsRoleSessionName; - protected String stsEndpointOverride; + protected Property stsRoleArn; + protected Property stsRoleExternalId; + protected Property stsRoleSessionName; + protected Property stsEndpointOverride; @Builder.Default - protected Duration stsRoleSessionDuration = AbstractConnectionInterface.AWS_MIN_STS_ROLE_SESSION_DURATION; + protected Property stsRoleSessionDuration = Property.of(AbstractConnectionInterface.AWS_MIN_STS_ROLE_SESSION_DURATION); // Default read timeout is 20s, so we cannot use a bigger wait time, or we would need to increase the read timeout. - @PluginProperty @Schema(title = "The duration for which the SQS client waits for a message.") @Builder.Default - protected Duration waitTime = Duration.ofSeconds(20); + protected Property waitTime = Property.of(Duration.ofSeconds(20)); - @PluginProperty @Schema( title = "The maximum number of messages returned from request made to SQS.", description = "Increasing this value can reduce the number of requests made to SQS. Amazon SQS never returns more messages than this value (however, fewer messages might be returned). Valid values: 1 to 10." ) @Builder.Default - protected Integer maxNumberOfMessage = 5; + protected Property maxNumberOfMessage = Property.of(5); - @PluginProperty @Schema( title = "The maximum number of attempts used by the SQS client's retry strategy." ) @Builder.Default - protected Integer clientRetryMaxAttempts = 3; + protected Property clientRetryMaxAttempts = Property.of(3); @Builder.Default @Getter(AccessLevel.NONE) @@ -120,12 +116,12 @@ public Publisher evaluate(ConditionContext conditionContext, TriggerC RunContext runContext = conditionContext.getRunContext(); Consume task = Consume.builder() - .queueUrl(runContext.render(queueUrl)) - .accessKeyId(runContext.render(accessKeyId)) - .secretKeyId(runContext.render(secretKeyId)) - .sessionToken(runContext.render(sessionToken)) + .queueUrl(queueUrl) + .accessKeyId(accessKeyId) + .secretKeyId(secretKeyId) + .sessionToken(sessionToken) .region(region) - .endpointOverride(runContext.render(endpointOverride)) + .endpointOverride(endpointOverride) .serdeType(this.serdeType) .stsRoleArn(this.stsRoleArn) .stsRoleSessionName(this.stsRoleSessionName) @@ -140,16 +136,16 @@ public Publisher evaluate(ConditionContext conditionContext, TriggerC public Flux publisher(final Consume task, final RunContext runContext) throws Exception { - var queueUrl = runContext.render(getQueueUrl()); + var renderedQueueUrl = runContext.render(getQueueUrl()).as(String.class).orElseThrow(); return Flux.create( fluxSink -> { - try (SqsAsyncClient sqsClient = task.asyncClient(runContext, clientRetryMaxAttempts)) { + try (SqsAsyncClient sqsClient = task.asyncClient(runContext, runContext.render(clientRetryMaxAttempts).as(Integer.class).orElseThrow())) { while (isActive.get()) { ReceiveMessageRequest receiveRequest = ReceiveMessageRequest.builder() - .queueUrl(queueUrl) - .waitTimeSeconds((int)waitTime.toSeconds()) - .maxNumberOfMessages(maxNumberOfMessage) + .queueUrl(renderedQueueUrl) + .waitTimeSeconds((int) runContext.render(waitTime).as(Duration.class).orElseThrow().toSeconds()) + .maxNumberOfMessages(runContext.render(maxNumberOfMessage).as(Integer.class).orElseThrow()) .build(); sqsClient.receiveMessage(receiveRequest) @@ -162,7 +158,7 @@ public Flux publisher(final Consume task, }); messageResponse.messages().forEach(message -> sqsClient.deleteMessage(DeleteMessageRequest.builder() - .queueUrl(queueUrl) + .queueUrl(renderedQueueUrl) .receiptHandle(message.receiptHandle()) .build() ) diff --git a/src/main/java/io/kestra/plugin/aws/sqs/SqsConnectionInterface.java b/src/main/java/io/kestra/plugin/aws/sqs/SqsConnectionInterface.java index 7516f319..5e3c2278 100644 --- a/src/main/java/io/kestra/plugin/aws/sqs/SqsConnectionInterface.java +++ b/src/main/java/io/kestra/plugin/aws/sqs/SqsConnectionInterface.java @@ -1,6 +1,7 @@ package io.kestra.plugin.aws.sqs; import io.kestra.core.models.annotations.PluginProperty; +import io.kestra.core.models.property.Property; import io.kestra.plugin.aws.AbstractConnectionInterface; import io.swagger.v3.oas.annotations.media.Schema; @@ -8,7 +9,6 @@ public interface SqsConnectionInterface extends AbstractConnectionInterface { @Schema(title = "The SQS queue URL. The queue must already exist.") - @PluginProperty(dynamic = true) @NotNull - String getQueueUrl(); + Property getQueueUrl(); } diff --git a/src/main/java/io/kestra/plugin/aws/sqs/Trigger.java b/src/main/java/io/kestra/plugin/aws/sqs/Trigger.java index 2b70f76a..96a17da4 100644 --- a/src/main/java/io/kestra/plugin/aws/sqs/Trigger.java +++ b/src/main/java/io/kestra/plugin/aws/sqs/Trigger.java @@ -55,42 +55,39 @@ ) public class Trigger extends AbstractTrigger implements PollingTriggerInterface, TriggerOutput, SqsConnectionInterface { - private String queueUrl; + private Property queueUrl; - private String accessKeyId; + private Property accessKeyId; - private String secretKeyId; + private Property secretKeyId; - private String sessionToken; + private Property sessionToken; private Property region; - private String endpointOverride; + private Property endpointOverride; @Builder.Default private final Duration interval = Duration.ofSeconds(60); - @PluginProperty @Schema(title = "Max number of records, when reached the task will end.") - private Integer maxRecords; + private Property maxRecords; - @PluginProperty @Schema(title = "Max duration in the Duration ISO format, after that the task will end.") - private Duration maxDuration; + private Property maxDuration; @Builder.Default - @PluginProperty @NotNull @Schema(title = "The serializer/deserializer to use.") - private SerdeType serdeType = SerdeType.STRING; + private Property serdeType = Property.of(SerdeType.STRING); // Configuration for AWS STS AssumeRole - protected String stsRoleArn; - protected String stsRoleExternalId; - protected String stsRoleSessionName; - protected String stsEndpointOverride; + protected Property stsRoleArn; + protected Property stsRoleExternalId; + protected Property stsRoleSessionName; + protected Property stsEndpointOverride; @Builder.Default - protected Duration stsRoleSessionDuration = AbstractConnectionInterface.AWS_MIN_STS_ROLE_SESSION_DURATION; + protected Property stsRoleSessionDuration = Property.of(AbstractConnectionInterface.AWS_MIN_STS_ROLE_SESSION_DURATION); @Override public Optional evaluate(ConditionContext conditionContext, TriggerContext context) throws Exception { @@ -98,12 +95,12 @@ public Optional evaluate(ConditionContext conditionContext, TriggerCo Logger logger = runContext.logger(); Consume task = Consume.builder() - .queueUrl(runContext.render(queueUrl)) - .accessKeyId(runContext.render(accessKeyId)) - .secretKeyId(runContext.render(secretKeyId)) - .sessionToken(runContext.render(sessionToken)) + .queueUrl(queueUrl) + .accessKeyId(accessKeyId) + .secretKeyId(secretKeyId) + .sessionToken(sessionToken) .region(region) - .endpointOverride(runContext.render(endpointOverride)) + .endpointOverride(endpointOverride) .maxRecords(this.maxRecords) .maxDuration(this.maxDuration) .serdeType(this.serdeType) diff --git a/src/test/java/io/kestra/plugin/aws/athena/QueryTest.java b/src/test/java/io/kestra/plugin/aws/athena/QueryTest.java index 04f6741a..b7877182 100644 --- a/src/test/java/io/kestra/plugin/aws/athena/QueryTest.java +++ b/src/test/java/io/kestra/plugin/aws/athena/QueryTest.java @@ -38,12 +38,12 @@ void run() throws Exception { .id("hello") .type(Query.class.getName()) .region(Property.of("eu-west-3")) - .accessKeyId(accessKey) - .secretKeyId(secretKey) - .database("units") - .fetchType(FetchType.FETCH) - .outputLocation("s3://kestra-unit-test") - .query("select * from types") + .accessKeyId(Property.of(accessKey)) + .secretKeyId(Property.of(secretKey)) + .database(Property.of("units")) + .fetchType(Property.of(FetchType.FETCH)) + .outputLocation(Property.of("s3://kestra-unit-test")) + .query(Property.of("select * from types")) .build(); var output = query.run(runContext); diff --git a/src/test/java/io/kestra/plugin/aws/cli/AwsCLITest.java b/src/test/java/io/kestra/plugin/aws/cli/AwsCLITest.java index fd73001c..c6d10359 100644 --- a/src/test/java/io/kestra/plugin/aws/cli/AwsCLITest.java +++ b/src/test/java/io/kestra/plugin/aws/cli/AwsCLITest.java @@ -40,9 +40,9 @@ void run() throws Exception { .image("amazon/aws-cli") .entryPoint(Collections.emptyList()) .build()) - .endpointOverride(localstack.getEndpointOverride(LocalStackContainer.Service.S3).toString()) - .accessKeyId(localstack.getAccessKey()) - .secretKeyId(localstack.getSecretKey()) + .endpointOverride(Property.of(localstack.getEndpointOverride(LocalStackContainer.Service.S3).toString())) + .accessKeyId(Property.of(localstack.getAccessKey())) + .secretKeyId(Property.of(localstack.getSecretKey())) .region(Property.of(localstack.getRegion())) .env(Map.of("{{ inputs.envKey }}", "{{ inputs.envValue }}")) .commands(List.of( diff --git a/src/test/java/io/kestra/plugin/aws/dynamodb/DeleteItemTest.java b/src/test/java/io/kestra/plugin/aws/dynamodb/DeleteItemTest.java index ade7d89b..42097798 100644 --- a/src/test/java/io/kestra/plugin/aws/dynamodb/DeleteItemTest.java +++ b/src/test/java/io/kestra/plugin/aws/dynamodb/DeleteItemTest.java @@ -18,12 +18,12 @@ void run() throws Exception { var runContext = runContextFactory.of(); var delete = DeleteItem.builder() - .endpointOverride(localstack.getEndpointOverride(LocalStackContainer.Service.DYNAMODB).toString()) + .endpointOverride(Property.of(localstack.getEndpointOverride(LocalStackContainer.Service.DYNAMODB).toString())) .region(Property.of(localstack.getRegion())) - .accessKeyId(localstack.getAccessKey()) - .secretKeyId(localstack.getSecretKey()) - .tableName("persons") - .key(Map.of("id", "1")) + .accessKeyId(Property.of(localstack.getAccessKey())) + .secretKeyId(Property.of(localstack.getSecretKey())) + .tableName(Property.of("persons")) + .key(Property.of(Map.of("id", "1"))) .build(); createTable(runContext, delete); diff --git a/src/test/java/io/kestra/plugin/aws/dynamodb/GetItemTest.java b/src/test/java/io/kestra/plugin/aws/dynamodb/GetItemTest.java index aa2b72ad..55f3a412 100644 --- a/src/test/java/io/kestra/plugin/aws/dynamodb/GetItemTest.java +++ b/src/test/java/io/kestra/plugin/aws/dynamodb/GetItemTest.java @@ -17,12 +17,12 @@ void run() throws Exception { var runContext = runContextFactory.of(); var get = GetItem.builder() - .endpointOverride(localstack.getEndpointOverride(LocalStackContainer.Service.DYNAMODB).toString()) + .endpointOverride(Property.of(localstack.getEndpointOverride(LocalStackContainer.Service.DYNAMODB).toString())) .region(Property.of(localstack.getRegion())) - .accessKeyId(localstack.getAccessKey()) - .secretKeyId(localstack.getSecretKey()) - .tableName("persons") - .key(Map.of("id", "1")) + .accessKeyId(Property.of(localstack.getAccessKey())) + .secretKeyId(Property.of(localstack.getSecretKey())) + .tableName(Property.of("persons")) + .key(Property.of(Map.of("id", "1"))) .build(); createTable(runContext, get); diff --git a/src/test/java/io/kestra/plugin/aws/dynamodb/PutItemTest.java b/src/test/java/io/kestra/plugin/aws/dynamodb/PutItemTest.java index 462d07cc..6df548d2 100644 --- a/src/test/java/io/kestra/plugin/aws/dynamodb/PutItemTest.java +++ b/src/test/java/io/kestra/plugin/aws/dynamodb/PutItemTest.java @@ -16,11 +16,11 @@ void runMap() throws Exception { var runContext = runContextFactory.of(); var put = PutItem.builder() - .endpointOverride(localstack.getEndpointOverride(LocalStackContainer.Service.DYNAMODB).toString()) + .endpointOverride(Property.of(localstack.getEndpointOverride(LocalStackContainer.Service.DYNAMODB).toString())) .region(Property.of(localstack.getRegion())) - .accessKeyId(localstack.getAccessKey()) - .secretKeyId(localstack.getSecretKey()) - .tableName("persons") + .accessKeyId(Property.of(localstack.getAccessKey())) + .secretKeyId(Property.of(localstack.getSecretKey())) + .tableName(Property.of("persons")) .item(Map.of( "id", "1", "firstname", "John", @@ -40,11 +40,11 @@ void runString() throws Exception { var runContext = runContextFactory.of(); var put = PutItem.builder() - .endpointOverride(localstack.getEndpointOverride(LocalStackContainer.Service.DYNAMODB).toString()) + .endpointOverride(Property.of(localstack.getEndpointOverride(LocalStackContainer.Service.DYNAMODB).toString())) .region(Property.of(localstack.getRegion())) - .accessKeyId(localstack.getAccessKey()) - .secretKeyId(localstack.getSecretKey()) - .tableName("persons") + .accessKeyId(Property.of(localstack.getAccessKey())) + .secretKeyId(Property.of(localstack.getSecretKey())) + .tableName(Property.of("persons")) .item("{\"id\": \"1\", \"firstname\": \"Jane\", \"lastname\": \"Doe\"}") .build(); diff --git a/src/test/java/io/kestra/plugin/aws/dynamodb/QueryTest.java b/src/test/java/io/kestra/plugin/aws/dynamodb/QueryTest.java index 495f2314..330ce8ef 100644 --- a/src/test/java/io/kestra/plugin/aws/dynamodb/QueryTest.java +++ b/src/test/java/io/kestra/plugin/aws/dynamodb/QueryTest.java @@ -21,14 +21,14 @@ void runFetch() throws Exception { var runContext = runContextFactory.of(); var query = Query.builder() - .endpointOverride(localstack.getEndpointOverride(LocalStackContainer.Service.DYNAMODB).toString()) + .endpointOverride(Property.of(localstack.getEndpointOverride(LocalStackContainer.Service.DYNAMODB).toString())) .region(Property.of(localstack.getRegion())) - .accessKeyId(localstack.getAccessKey()) - .secretKeyId(localstack.getSecretKey()) - .tableName("persons") - .keyConditionExpression("id = :id") - .expressionAttributeValues(Map.of(":id", "1")) - .fetchType(FetchType.FETCH) + .accessKeyId(Property.of(localstack.getAccessKey())) + .secretKeyId(Property.of(localstack.getSecretKey())) + .tableName(Property.of("persons")) + .keyConditionExpression(Property.of("id = :id")) + .expressionAttributeValues(Property.of(Map.of(":id", "1"))) + .fetchType(Property.of(FetchType.FETCH)) .build(); createTable(runContext, query); @@ -48,15 +48,15 @@ void runFetchWithExpression() throws Exception { var runContext = runContextFactory.of(); var query = Query.builder() - .endpointOverride(localstack.getEndpointOverride(LocalStackContainer.Service.DYNAMODB).toString()) + .endpointOverride(Property.of(localstack.getEndpointOverride(LocalStackContainer.Service.DYNAMODB).toString())) .region(Property.of(localstack.getRegion())) - .accessKeyId(localstack.getAccessKey()) - .secretKeyId(localstack.getSecretKey()) - .tableName("persons") - .keyConditionExpression("id = :id") - .filterExpression("lastname = :lastname") - .expressionAttributeValues(Map.of(":id", "1", ":lastname", "Doe")) - .fetchType(FetchType.FETCH) + .accessKeyId(Property.of(localstack.getAccessKey())) + .secretKeyId(Property.of(localstack.getSecretKey())) + .tableName(Property.of("persons")) + .keyConditionExpression(Property.of("id = :id")) + .filterExpression(Property.of("lastname = :lastname")) + .expressionAttributeValues(Property.of(Map.of(":id", "1", ":lastname", "Doe"))) + .fetchType(Property.of(FetchType.FETCH)) .build(); createTable(runContext, query); @@ -76,14 +76,14 @@ void runFetchMultipleExpression() throws Exception { var runContext = runContextFactory.of(); var query = Query.builder() - .endpointOverride(localstack.getEndpointOverride(LocalStackContainer.Service.DYNAMODB).toString()) + .endpointOverride(Property.of(localstack.getEndpointOverride(LocalStackContainer.Service.DYNAMODB).toString())) .region(Property.of(localstack.getRegion())) - .accessKeyId(localstack.getAccessKey()) - .secretKeyId(localstack.getSecretKey()) - .tableName("persons") - .keyConditionExpression("id = :id") - .expressionAttributeValues(Map.of(":id", "1")) - .fetchType(FetchType.FETCH) + .accessKeyId(Property.of(localstack.getAccessKey())) + .secretKeyId(Property.of(localstack.getSecretKey())) + .tableName(Property.of("persons")) + .keyConditionExpression(Property.of("id = :id")) + .expressionAttributeValues(Property.of(Map.of(":id", "1"))) + .fetchType(Property.of(FetchType.FETCH)) .build(); createTable(runContext, query); @@ -103,14 +103,14 @@ void runFetchOne() throws Exception { var runContext = runContextFactory.of(); var query = Query.builder() - .endpointOverride(localstack.getEndpointOverride(LocalStackContainer.Service.DYNAMODB).toString()) + .endpointOverride(Property.of(localstack.getEndpointOverride(LocalStackContainer.Service.DYNAMODB).toString())) .region(Property.of(localstack.getRegion())) - .accessKeyId(localstack.getAccessKey()) - .secretKeyId(localstack.getSecretKey()) - .tableName("persons") - .keyConditionExpression("id = :id") - .expressionAttributeValues(Map.of(":id", "1")) - .fetchType(FetchType.FETCH_ONE) + .accessKeyId(Property.of(localstack.getAccessKey())) + .secretKeyId(Property.of(localstack.getSecretKey())) + .tableName(Property.of("persons")) + .keyConditionExpression(Property.of("id = :id")) + .expressionAttributeValues(Property.of(Map.of(":id", "1"))) + .fetchType(Property.of(FetchType.FETCH_ONE)) .build(); createTable(runContext, query); @@ -131,14 +131,14 @@ void runStored() throws Exception { var runContext = runContextFactory.of(); var query = Query.builder() - .endpointOverride(localstack.getEndpointOverride(LocalStackContainer.Service.DYNAMODB).toString()) + .endpointOverride(Property.of(localstack.getEndpointOverride(LocalStackContainer.Service.DYNAMODB).toString())) .region(Property.of(localstack.getRegion())) - .accessKeyId(localstack.getAccessKey()) - .secretKeyId(localstack.getSecretKey()) - .tableName("persons") - .keyConditionExpression("id = :id") - .expressionAttributeValues(Map.of(":id", "1")) - .fetchType(FetchType.STORE) + .accessKeyId(Property.of(localstack.getAccessKey())) + .secretKeyId(Property.of(localstack.getSecretKey())) + .tableName(Property.of("persons")) + .keyConditionExpression(Property.of("id = :id")) + .expressionAttributeValues(Property.of(Map.of(":id", "1"))) + .fetchType(Property.of(FetchType.STORE)) .build(); createTable(runContext, query); diff --git a/src/test/java/io/kestra/plugin/aws/dynamodb/ScanTest.java b/src/test/java/io/kestra/plugin/aws/dynamodb/ScanTest.java index 888f7219..61022ce7 100644 --- a/src/test/java/io/kestra/plugin/aws/dynamodb/ScanTest.java +++ b/src/test/java/io/kestra/plugin/aws/dynamodb/ScanTest.java @@ -21,14 +21,14 @@ void runFetch() throws Exception { var runContext = runContextFactory.of(); var scan = Scan.builder() - .endpointOverride(localstack.getEndpointOverride(LocalStackContainer.Service.DYNAMODB).toString()) + .endpointOverride(Property.of(localstack.getEndpointOverride(LocalStackContainer.Service.DYNAMODB).toString())) .region(Property.of(localstack.getRegion())) - .accessKeyId(localstack.getAccessKey()) - .secretKeyId(localstack.getSecretKey()) - .tableName("persons") - .filterExpression("lastname = :lastname") - .expressionAttributeValues(Map.of(":lastname", "Doe")) - .fetchType(FetchType.FETCH) + .accessKeyId(Property.of(localstack.getAccessKey())) + .secretKeyId(Property.of(localstack.getSecretKey())) + .tableName(Property.of("persons")) + .filterExpression(Property.of("lastname = :lastname")) + .expressionAttributeValues(Property.of(Map.of(":lastname", "Doe"))) + .fetchType(Property.of(FetchType.FETCH)) .build(); createTable(runContext, scan); @@ -48,12 +48,12 @@ void runFetchNoExpression() throws Exception { var runContext = runContextFactory.of(); var scan = Scan.builder() - .endpointOverride(localstack.getEndpointOverride(LocalStackContainer.Service.DYNAMODB).toString()) + .endpointOverride(Property.of(localstack.getEndpointOverride(LocalStackContainer.Service.DYNAMODB).toString())) .region(Property.of(localstack.getRegion())) - .accessKeyId(localstack.getAccessKey()) - .secretKeyId(localstack.getSecretKey()) - .tableName("persons") - .fetchType(FetchType.FETCH) + .accessKeyId(Property.of(localstack.getAccessKey())) + .secretKeyId(Property.of(localstack.getSecretKey())) + .tableName(Property.of("persons")) + .fetchType(Property.of(FetchType.FETCH)) .build(); createTable(runContext, scan); @@ -73,14 +73,14 @@ void runFetchMultipleExpression() throws Exception { var runContext = runContextFactory.of(); var scan = Scan.builder() - .endpointOverride(localstack.getEndpointOverride(LocalStackContainer.Service.DYNAMODB).toString()) + .endpointOverride(Property.of(localstack.getEndpointOverride(LocalStackContainer.Service.DYNAMODB).toString())) .region(Property.of(localstack.getRegion())) - .accessKeyId(localstack.getAccessKey()) - .secretKeyId(localstack.getSecretKey()) - .tableName("persons") - .filterExpression("lastname = :lastname and firstname = :firstname") - .expressionAttributeValues(Map.of(":lastname", "Doe", ":firstname", "Jane")) - .fetchType(FetchType.FETCH) + .accessKeyId(Property.of(localstack.getAccessKey())) + .secretKeyId(Property.of(localstack.getSecretKey())) + .tableName(Property.of("persons")) + .filterExpression(Property.of("lastname = :lastname and firstname = :firstname")) + .expressionAttributeValues(Property.of(Map.of(":lastname", "Doe", ":firstname", "Jane"))) + .fetchType(Property.of(FetchType.FETCH)) .build(); createTable(runContext, scan); @@ -100,14 +100,14 @@ void runFetchOne() throws Exception { var runContext = runContextFactory.of(); var scan = Scan.builder() - .endpointOverride(localstack.getEndpointOverride(LocalStackContainer.Service.DYNAMODB).toString()) + .endpointOverride(Property.of(localstack.getEndpointOverride(LocalStackContainer.Service.DYNAMODB).toString())) .region(Property.of(localstack.getRegion())) - .accessKeyId(localstack.getAccessKey()) - .secretKeyId(localstack.getSecretKey()) - .tableName("persons") - .filterExpression("lastname = :lastname") - .expressionAttributeValues(Map.of(":lastname", "Doe")) - .fetchType(FetchType.FETCH_ONE) + .accessKeyId(Property.of(localstack.getAccessKey())) + .secretKeyId(Property.of(localstack.getSecretKey())) + .tableName(Property.of("persons")) + .filterExpression(Property.of("lastname = :lastname")) + .expressionAttributeValues(Property.of(Map.of(":lastname", "Doe"))) + .fetchType(Property.of(FetchType.FETCH_ONE)) .build(); createTable(runContext, scan); @@ -128,14 +128,14 @@ void runStored() throws Exception { var runContext = runContextFactory.of(); var scan = Scan.builder() - .endpointOverride(localstack.getEndpointOverride(LocalStackContainer.Service.DYNAMODB).toString()) + .endpointOverride(Property.of(localstack.getEndpointOverride(LocalStackContainer.Service.DYNAMODB).toString())) .region(Property.of(localstack.getRegion())) - .accessKeyId(localstack.getAccessKey()) - .secretKeyId(localstack.getSecretKey()) - .tableName("persons") - .filterExpression("lastname = :lastname") - .expressionAttributeValues(Map.of(":lastname", "Doe")) - .fetchType(FetchType.STORE) + .accessKeyId(Property.of(localstack.getAccessKey())) + .secretKeyId(Property.of(localstack.getSecretKey())) + .tableName(Property.of("persons")) + .filterExpression(Property.of("lastname = :lastname")) + .expressionAttributeValues(Property.of(Map.of(":lastname", "Doe"))) + .fetchType(Property.of(FetchType.STORE)) .build(); createTable(runContext, scan); diff --git a/src/test/java/io/kestra/plugin/aws/ecr/GetAuthTokenTest.java b/src/test/java/io/kestra/plugin/aws/ecr/GetAuthTokenTest.java index 3cbcb720..d6a1a855 100644 --- a/src/test/java/io/kestra/plugin/aws/ecr/GetAuthTokenTest.java +++ b/src/test/java/io/kestra/plugin/aws/ecr/GetAuthTokenTest.java @@ -28,9 +28,9 @@ void run() throws Exception { RunContext runContext = runContextFactory.of(); GetAuthToken query = GetAuthToken.builder() - .endpointOverride(localstack.getEndpointOverride(LocalStackContainer.Service.EC2).toString()) - .accessKeyId(localstack.getAccessKey()) - .secretKeyId(localstack.getSecretKey()) + .endpointOverride(Property.of(localstack.getEndpointOverride(LocalStackContainer.Service.EC2).toString())) + .accessKeyId(Property.of(localstack.getAccessKey())) + .secretKeyId(Property.of(localstack.getSecretKey())) .region(Property.of(localstack.getRegion())) .build(); diff --git a/src/test/java/io/kestra/plugin/aws/eventbridge/PutEventsTest.java b/src/test/java/io/kestra/plugin/aws/eventbridge/PutEventsTest.java index cb9a11e6..64968894 100644 --- a/src/test/java/io/kestra/plugin/aws/eventbridge/PutEventsTest.java +++ b/src/test/java/io/kestra/plugin/aws/eventbridge/PutEventsTest.java @@ -79,10 +79,10 @@ void runMap() throws Exception { )) .build(); var put = PutEvents.builder() - .endpointOverride(localstack.getEndpoint().toString()) + .endpointOverride(Property.of(localstack.getEndpoint().toString())) .region(Property.of(localstack.getRegion())) - .accessKeyId(localstack.getAccessKey()) - .secretKeyId(localstack.getSecretKey()) + .accessKeyId(Property.of(localstack.getAccessKey())) + .secretKeyId(Property.of(localstack.getSecretKey())) .entries(List.of(entry, entry2, entry3)) .build(); @@ -143,10 +143,10 @@ void runStorage() throws Exception { } var put = PutEvents.builder() - .endpointOverride(localstack.getEndpoint().toString()) + .endpointOverride(Property.of(localstack.getEndpoint().toString())) .region(Property.of(localstack.getRegion())) - .accessKeyId(localstack.getAccessKey()) - .secretKeyId(localstack.getSecretKey()) + .accessKeyId(Property.of(localstack.getAccessKey())) + .secretKeyId(Property.of(localstack.getSecretKey())) .entries(runContext.storage().putFile(tempFile).toString()) .build(); @@ -185,10 +185,10 @@ void runString() throws Exception { )) .build(); var put = PutEvents.builder() - .endpointOverride(localstack.getEndpoint().toString()) + .endpointOverride(Property.of(localstack.getEndpoint().toString())) .region(Property.of(localstack.getRegion())) - .accessKeyId(localstack.getAccessKey()) - .secretKeyId(localstack.getSecretKey()) + .accessKeyId(Property.of(localstack.getAccessKey())) + .secretKeyId(Property.of(localstack.getSecretKey())) .entries(List.of(entry, entry, entry)) .build(); @@ -215,10 +215,10 @@ void runStringUpperCase() throws Exception { )) .build(); var put = PutEvents.builder() - .endpointOverride(localstack.getEndpoint().toString()) + .endpointOverride(Property.of(localstack.getEndpoint().toString())) .region(Property.of(localstack.getRegion())) - .accessKeyId(localstack.getAccessKey()) - .secretKeyId(localstack.getSecretKey()) + .accessKeyId(Property.of(localstack.getAccessKey())) + .secretKeyId(Property.of(localstack.getSecretKey())) .entries(List.of(entry, entry, entry)) .build(); diff --git a/src/test/java/io/kestra/plugin/aws/kinesis/PutRecordsTest.java b/src/test/java/io/kestra/plugin/aws/kinesis/PutRecordsTest.java index d99cd279..15865b44 100644 --- a/src/test/java/io/kestra/plugin/aws/kinesis/PutRecordsTest.java +++ b/src/test/java/io/kestra/plugin/aws/kinesis/PutRecordsTest.java @@ -117,11 +117,11 @@ void runMap() throws Exception { .data("record 3") .build(); var put = PutRecords.builder() - .endpointOverride(localstack.getEndpoint().toString()) + .endpointOverride(Property.of(localstack.getEndpoint().toString())) .region(Property.of(localstack.getRegion())) - .accessKeyId(localstack.getAccessKey()) - .secretKeyId(localstack.getSecretKey()) - .streamName("streamName") + .accessKeyId(Property.of(localstack.getAccessKey())) + .secretKeyId(Property.of(localstack.getSecretKey())) + .streamName(Property.of("streamName")) .records(List.of(record, record2, record3)) .build(); @@ -169,12 +169,12 @@ void runStorage() throws Exception { } var put = PutRecords.builder() - .endpointOverride(localstack.getEndpoint().toString()) + .endpointOverride(Property.of(localstack.getEndpoint().toString())) .region(Property.of(localstack.getRegion())) - .accessKeyId(localstack.getAccessKey()) - .secretKeyId(localstack.getSecretKey()) + .accessKeyId(Property.of(localstack.getAccessKey())) + .secretKeyId(Property.of(localstack.getSecretKey())) .records(runContext.storage().putFile(tempFile).toString()) - .streamName("streamName") + .streamName(Property.of("streamName")) .build(); @@ -222,12 +222,12 @@ void runStorageUpperCase() throws Exception { } var put = PutRecords.builder() - .endpointOverride(localstack.getEndpoint().toString()) + .endpointOverride(Property.of(localstack.getEndpoint().toString())) .region(Property.of(localstack.getRegion())) - .accessKeyId(localstack.getAccessKey()) - .secretKeyId(localstack.getSecretKey()) + .accessKeyId(Property.of(localstack.getAccessKey())) + .secretKeyId(Property.of(localstack.getSecretKey())) .records(runContext.storage().putFile(tempFile).toString()) - .streamName("streamName") + .streamName(Property.of("streamName")) .build(); diff --git a/src/test/java/io/kestra/plugin/aws/lambda/InvokeTest.java b/src/test/java/io/kestra/plugin/aws/lambda/InvokeTest.java index cf7fd5a5..799b6475 100644 --- a/src/test/java/io/kestra/plugin/aws/lambda/InvokeTest.java +++ b/src/test/java/io/kestra/plugin/aws/lambda/InvokeTest.java @@ -28,13 +28,13 @@ public void setUp() { public void givenExistingLambda_whenInvoked_thenOutputOkMetricsOk() throws Exception { // Given var invoke = Invoke.builder() - .endpointOverride(localstack.getEndpointOverride(LocalStackContainer.Service.LAMBDA).toString()) - .functionArn(FUNCTION_NAME) + .endpointOverride(Property.of(localstack.getEndpointOverride(LocalStackContainer.Service.LAMBDA).toString())) + .functionArn(Property.of(FUNCTION_NAME)) .id(InvokeTest.class.getSimpleName()) .type(InvokeTest.class.getName()) .region(Property.of(localstack.getRegion())) - .accessKeyId(localstack.getAccessKey()) - .secretKeyId(localstack.getSecretKey()) + .accessKeyId(Property.of(localstack.getAccessKey())) + .secretKeyId(Property.of(localstack.getSecretKey())) .build(); var client = invoke.client(context); @@ -62,13 +62,13 @@ public void givenExistingLambda_whenInvoked_thenOutputOkMetricsOk() throws Excep public void givenNotFoundLambda_whenInvoked_thenErrorNoMetrics() throws Exception { // Given var invoke = Invoke.builder() - .endpointOverride(localstack.getEndpointOverride(LocalStackContainer.Service.LAMBDA).toString()) - .functionArn("Fake_ARN") + .endpointOverride(Property.of(localstack.getEndpointOverride(LocalStackContainer.Service.LAMBDA).toString())) + .functionArn(Property.of("Fake_ARN")) .id(InvokeTest.class.getSimpleName()) .type(InvokeTest.class.getName()) .region(Property.of(localstack.getRegion())) - .accessKeyId(localstack.getAccessKey()) - .secretKeyId(localstack.getSecretKey()) + .accessKeyId(Property.of(localstack.getAccessKey())) + .secretKeyId(Property.of(localstack.getSecretKey())) .build(); var client = invoke.client(context); @@ -89,14 +89,14 @@ public void givenFailingLambda_whenInvoked_thenFailureNoMetrics() throws Excepti // ask for an error in the Lambda by function param (see test resource lambda/test.py) params.put("action", "error"); var invoke = Invoke.builder() - .endpointOverride(localstack.getEndpointOverride(LocalStackContainer.Service.LAMBDA).toString()) - .functionArn(FUNCTION_NAME) - .functionPayload(params) + .endpointOverride(Property.of(localstack.getEndpointOverride(LocalStackContainer.Service.LAMBDA).toString())) + .functionArn(Property.of(FUNCTION_NAME)) + .functionPayload(Property.of(params)) .id(InvokeTest.class.getSimpleName()) .type(InvokeTest.class.getName()) .region(Property.of(localstack.getRegion())) - .accessKeyId(localstack.getAccessKey()) - .secretKeyId(localstack.getSecretKey()) + .accessKeyId(Property.of(localstack.getAccessKey())) + .secretKeyId(Property.of(localstack.getSecretKey())) .build(); var client = invoke.client(context); diff --git a/src/test/java/io/kestra/plugin/aws/lambda/InvokeUnitTest.java b/src/test/java/io/kestra/plugin/aws/lambda/InvokeUnitTest.java index 8a3eedd2..d136a777 100644 --- a/src/test/java/io/kestra/plugin/aws/lambda/InvokeUnitTest.java +++ b/src/test/java/io/kestra/plugin/aws/lambda/InvokeUnitTest.java @@ -9,16 +9,19 @@ import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.BDDMockito.given; -import static org.mockito.Mockito.doReturn; -import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.*; + import java.io.ByteArrayInputStream; import java.io.File; import java.io.IOException; import java.net.URI; import java.nio.file.Files; +import java.util.Collections; +import java.util.Map; import java.util.Optional; import io.kestra.core.models.property.Property; +import io.kestra.core.runners.RunContextProperty; import io.kestra.core.storages.Storage; import org.apache.http.entity.ContentType; import org.junit.jupiter.api.AfterEach; @@ -42,12 +45,15 @@ @ExtendWith(MockitoExtension.class) public class InvokeUnitTest { - + private Invoke invoke; @Mock(strictness = Strictness.LENIENT) private RunContext context; + @Mock(strictness = Strictness.LENIENT) + private RunContextProperty runContextProperty; + @Mock(strictness = Strictness.LENIENT) private Storage storage; @@ -56,18 +62,30 @@ public class InvokeUnitTest { private File tempFile; + private String testValue; + @BeforeEach void setUp() throws IOException, IllegalVariableEvaluationException { given(context.storage()).willReturn(storage); given(context.workingDir()).willReturn(workingDir); given(context.workingDir().createTempFile()).willReturn(Files.createTempFile("test", "lambdainvoke")); given(context.metric(any())).willReturn(context); - given(context.render(anyString())).willAnswer(new Answer() { + given(context.render(any(Property.class))).willAnswer(new Answer>() { + @Override + public RunContextProperty answer(InvocationOnMock invocation) throws Throwable { + testValue = invocation.getArgument(0, Property.class).toString(); + return runContextProperty; + } + }); + given(runContextProperty.as(String.class)).willAnswer(new Answer>() { @Override - public String answer(InvocationOnMock invocation) throws Throwable { - return invocation.getArgument(0, String.class).toString(); + public Optional answer(InvocationOnMock invocation) throws Throwable { + return Optional.of(testValue); } }); + + given(runContextProperty.asMap(any(), any())).willAnswer((Answer) invocation -> Collections.emptyMap()); + given(storage.putFile(any(File.class))).willAnswer(new Answer() { @Override public URI answer(InvocationOnMock invocation) throws Throwable { @@ -77,14 +95,14 @@ public URI answer(InvocationOnMock invocation) throws Throwable { }); invoke = Invoke.builder() - .functionArn("test_function_arn") - .functionPayload(null) // w/o paramters now + .functionArn(Property.of("test_function_arn")) + .functionPayload(Property.of((Map.of()))) // w/o paramters now .id(InvokeUnitTest.class.getSimpleName()) .type(InvokeUnitTest.class.getName()) - .accessKeyId("test_accessKeyId") - .secretKeyId("test_secretKeyId") + .accessKeyId(Property.of("test_accessKeyId")) + .secretKeyId(Property.of("test_secretKeyId")) .region(Property.of("test_region")) - .build(); + .build(); } @AfterEach @@ -99,12 +117,12 @@ void tearDown() { void testParseContentType_NoContentType_Binary() { assertEquals(ContentType.APPLICATION_OCTET_STREAM, invoke.parseContentType(Optional.empty()), "Should be binary"); } - + @Test void testParseContentType_BadContent_Binary() { assertEquals(ContentType.APPLICATION_OCTET_STREAM, invoke.parseContentType(Optional.of("fake/type")), "Should be binary"); } - + @Test void testParseContentType_JSON() { assertEquals(ContentType.APPLICATION_JSON.getMimeType().toString(), @@ -115,7 +133,7 @@ void testParseContentType_JSON() { @Test void testReadError_NotJsonType(@Mock SdkBytes bytes) { assertThrows(LambdaInvokeException.class, () -> { - invoke.handleError(invoke.getFunctionArn(), ContentType.APPLICATION_OCTET_STREAM, bytes); + invoke.handleError(invoke.getFunctionArn().toString(), ContentType.APPLICATION_OCTET_STREAM, bytes); }, "Should throw an error" ); } @@ -126,7 +144,7 @@ void testReadError_FromJsonMessage(@Mock SdkBytes bytes) { given(bytes.asUtf8String()).willReturn( "{\"errorMessage\": \"" + errorText + "\", \"errorType\": \"KeyError\"}"); Throwable throwable = assertThrows(LambdaInvokeException.class, () -> { - invoke.handleError(invoke.getFunctionArn(), ContentType.APPLICATION_JSON, bytes); + invoke.handleError(invoke.getFunctionArn().toString(), ContentType.APPLICATION_JSON, bytes); }, "Should throw an error"); assertTrue(throwable.getMessage().indexOf(errorText) > 0, "Exception message should contain an original message"); @@ -138,7 +156,7 @@ void testHandleContent_SaveFile_ReturnOutput(@Mock SdkBytes bytes) throws IOExce var data = "some raw data"; given(bytes.asInputStream()).willReturn(new ByteArrayInputStream(data.getBytes())); - Output res = invoke.handleContent(context, invoke.getFunctionArn(), ContentType.APPLICATION_OCTET_STREAM, bytes); + Output res = invoke.handleContent(context, invoke.getFunctionArn().toString(), ContentType.APPLICATION_OCTET_STREAM, bytes); checkOutput(data, res); } @@ -155,13 +173,13 @@ private void checkOutput(final String originalData, final Output result) throws "Content type must mach"); } - // ******** BDD usecases ******** + // ******** BDD usecases ******** @Test void givenFunctionArnNoParams_whenInvokeLambda_thenOutputWithFile( @Mock LambdaClient awsLambda, @Mock InvokeResponse awsResponse, @Mock SdkHttpResponse awsHttpResponse, - @Mock SdkBytes payload) + @Mock SdkBytes payload) throws IllegalVariableEvaluationException, IOException { //Given: functionArn and no input params, AWS Lambda clinet mocked for the expected behaviour var data = "some raw data"; @@ -171,16 +189,16 @@ void givenFunctionArnNoParams_whenInvokeLambda_thenOutputWithFile( given(awsResponse.sdkHttpResponse()).willReturn(awsHttpResponse); given(awsResponse.payload()).willReturn(payload); given(awsLambda.invoke(any(InvokeRequest.class))).willReturn(awsResponse); - + // Mock AbstractLambdaInvoke.client() to return the mocked AWS client var spyInvoke = spy(invoke); doReturn(awsLambda).when(spyInvoke).client(any()); - - // When + + // When Output res = assertDoesNotThrow(() -> { return spyInvoke.run(context); }, "No exception should be thrown"); - + // Then checkOutput(data, res); } diff --git a/src/test/java/io/kestra/plugin/aws/s3/AbstractTest.java b/src/test/java/io/kestra/plugin/aws/s3/AbstractTest.java index ef506a05..bf2b17e3 100644 --- a/src/test/java/io/kestra/plugin/aws/s3/AbstractTest.java +++ b/src/test/java/io/kestra/plugin/aws/s3/AbstractTest.java @@ -45,10 +45,10 @@ protected String createBucket(String bucket) throws Exception { CreateBucket createBucket = CreateBucket.builder() .id(AllTest.class.getSimpleName()) .type(CreateBucket.class.getName()) - .bucket(bucket) - .endpointOverride(localstack.getEndpointOverride(LocalStackContainer.Service.S3).toString()) - .accessKeyId(localstack.getAccessKey()) - .secretKeyId(localstack.getSecretKey()) + .bucket(Property.of(bucket)) + .endpointOverride(Property.of(localstack.getEndpointOverride(LocalStackContainer.Service.S3).toString())) + .accessKeyId(Property.of(localstack.getAccessKey())) + .secretKeyId(Property.of(localstack.getSecretKey())) .region(Property.of(localstack.getRegion())) .build(); @@ -77,27 +77,27 @@ protected String upload(String dir, String bucket) throws Exception { Upload upload = Upload.builder() .id(AllTest.class.getSimpleName()) .type(Upload.class.getName()) - .bucket(bucket) - .endpointOverride(localstack.getEndpointOverride(LocalStackContainer.Service.S3).toString()) - .accessKeyId(localstack.getAccessKey()) - .secretKeyId(localstack.getSecretKey()) + .bucket(Property.of(bucket)) + .endpointOverride(Property.of(localstack.getEndpointOverride(LocalStackContainer.Service.S3).toString())) + .accessKeyId(Property.of(localstack.getAccessKey())) + .secretKeyId(Property.of(localstack.getSecretKey())) .region(Property.of(localstack.getRegion())) .from(source.toString()) - .key(dir + "/" + out + ".yml") + .key(Property.of(dir + "/" + out + ".yml")) .build(); upload.run(runContext(upload)); - return upload.getKey(); + return upload.getKey().toString(); } protected List.ListBuilder list() { return List.builder() .id(ListTest.class.getSimpleName()) .type(List.class.getName()) - .bucket(this.BUCKET) - .endpointOverride(localstack.getEndpointOverride(LocalStackContainer.Service.S3).toString()) - .accessKeyId(localstack.getAccessKey()) - .secretKeyId(localstack.getSecretKey()) + .bucket(Property.of(this.BUCKET)) + .endpointOverride(Property.of(localstack.getEndpointOverride(LocalStackContainer.Service.S3).toString())) + .accessKeyId(Property.of(localstack.getAccessKey())) + .secretKeyId(Property.of(localstack.getSecretKey())) .region(Property.of(localstack.getRegion())); } diff --git a/src/test/java/io/kestra/plugin/aws/s3/AllTest.java b/src/test/java/io/kestra/plugin/aws/s3/AllTest.java index db6f4454..326f24cd 100644 --- a/src/test/java/io/kestra/plugin/aws/s3/AllTest.java +++ b/src/test/java/io/kestra/plugin/aws/s3/AllTest.java @@ -27,12 +27,12 @@ void run() throws Exception { List list = List.builder() .id(AllTest.class.getSimpleName()) .type(Upload.class.getName()) - .bucket(this.BUCKET) - .endpointOverride(localstack.getEndpointOverride(LocalStackContainer.Service.S3).toString()) - .accessKeyId(localstack.getAccessKey()) - .secretKeyId(localstack.getSecretKey()) + .bucket(Property.of(this.BUCKET)) + .endpointOverride(Property.of(localstack.getEndpointOverride(LocalStackContainer.Service.S3).toString())) + .accessKeyId(Property.of(localstack.getAccessKey())) + .secretKeyId(Property.of(localstack.getSecretKey())) .region(Property.of(localstack.getRegion())) - .prefix("tasks/aws/upload/") + .prefix(Property.of("tasks/aws/upload/")) .build(); List.Output listOutput = list.run(runContext(list)); @@ -42,12 +42,12 @@ void run() throws Exception { Download download = Download.builder() .id(AllTest.class.getSimpleName()) .type(Download.class.getName()) - .bucket(this.BUCKET) - .endpointOverride(localstack.getEndpointOverride(LocalStackContainer.Service.S3).toString()) - .accessKeyId(localstack.getAccessKey()) - .secretKeyId(localstack.getSecretKey()) + .bucket(Property.of(this.BUCKET)) + .endpointOverride(Property.of(localstack.getEndpointOverride(LocalStackContainer.Service.S3).toString())) + .accessKeyId(Property.of(localstack.getAccessKey())) + .secretKeyId(Property.of(localstack.getSecretKey())) .region(Property.of(localstack.getRegion())) - .key(key) + .key(Property.of(key)) .build(); Download.Output run = download.run(runContext(download)); @@ -61,12 +61,12 @@ void run() throws Exception { Delete delete = Delete.builder() .id(AllTest.class.getSimpleName()) .type(Delete.class.getName()) - .bucket(this.BUCKET) - .endpointOverride(localstack.getEndpointOverride(LocalStackContainer.Service.S3).toString()) - .accessKeyId(localstack.getAccessKey()) - .secretKeyId(localstack.getSecretKey()) + .bucket(Property.of(this.BUCKET)) + .endpointOverride(Property.of(localstack.getEndpointOverride(LocalStackContainer.Service.S3).toString())) + .accessKeyId(Property.of(localstack.getAccessKey())) + .secretKeyId(Property.of(localstack.getSecretKey())) .region(Property.of(localstack.getRegion())) - .key(key) + .key(Property.of(key)) .build(); Delete.Output deleteOutput = delete.run(runContext(delete)); assertThat(deleteOutput.getDeleteMarker(), is(nullValue())); diff --git a/src/test/java/io/kestra/plugin/aws/s3/CopyTest.java b/src/test/java/io/kestra/plugin/aws/s3/CopyTest.java index c8e4023c..a21b5cad 100644 --- a/src/test/java/io/kestra/plugin/aws/s3/CopyTest.java +++ b/src/test/java/io/kestra/plugin/aws/s3/CopyTest.java @@ -19,33 +19,33 @@ void run(Boolean delete) throws Exception { Copy task = Copy.builder() .id(CopyTest.class.getSimpleName()) .type(List.class.getName()) - .endpointOverride(localstack.getEndpointOverride(LocalStackContainer.Service.S3).toString()) - .accessKeyId(localstack.getAccessKey()) - .secretKeyId(localstack.getSecretKey()) + .endpointOverride(Property.of(localstack.getEndpointOverride(LocalStackContainer.Service.S3).toString())) + .accessKeyId(Property.of(localstack.getAccessKey())) + .secretKeyId(Property.of(localstack.getSecretKey())) .region(Property.of(localstack.getRegion())) .from(Copy.CopyObjectFrom.builder() - .bucket(this.BUCKET) - .key(upload) + .bucket(Property.of(this.BUCKET)) + .key(Property.of(upload)) .build() ) .to(Copy.CopyObject.builder() - .key(move) + .key(Property.of(move)) .build() ) - .delete(delete) + .delete(Property.of(delete)) .build(); Copy.Output run = task.run(runContext(task)); assertThat(run.getKey(), is(move)); // list - List list = list().prefix(move).build(); + List list = list().prefix(Property.of(move)).build(); List.Output listOutput = list.run(runContext(list)); assertThat(listOutput.getObjects().size(), is(1)); // original is here - list = list().prefix(upload).build(); + list = list().prefix(Property.of(upload)).build(); listOutput = list.run(runContext(list)); assertThat(listOutput.getObjects().size(), is(delete ? 0 : 1)); diff --git a/src/test/java/io/kestra/plugin/aws/s3/DeleteListTest.java b/src/test/java/io/kestra/plugin/aws/s3/DeleteListTest.java index 07f13704..9b766f9c 100644 --- a/src/test/java/io/kestra/plugin/aws/s3/DeleteListTest.java +++ b/src/test/java/io/kestra/plugin/aws/s3/DeleteListTest.java @@ -21,10 +21,10 @@ void run() throws Exception { DeleteList task = DeleteList.builder() .id(ListTest.class.getSimpleName()) .type(List.class.getName()) - .bucket(this.BUCKET) - .endpointOverride(localstack.getEndpointOverride(LocalStackContainer.Service.S3).toString()) - .accessKeyId(localstack.getAccessKey()) - .secretKeyId(localstack.getSecretKey()) + .bucket(Property.of(this.BUCKET)) + .endpointOverride(Property.of(localstack.getEndpointOverride(LocalStackContainer.Service.S3).toString())) + .accessKeyId(Property.of(localstack.getAccessKey())) + .secretKeyId(Property.of(localstack.getSecretKey())) .region(Property.of(localstack.getRegion())) .concurrent(5) .build(); diff --git a/src/test/java/io/kestra/plugin/aws/s3/DownloadsTest.java b/src/test/java/io/kestra/plugin/aws/s3/DownloadsTest.java index 71303a22..b11ecf20 100644 --- a/src/test/java/io/kestra/plugin/aws/s3/DownloadsTest.java +++ b/src/test/java/io/kestra/plugin/aws/s3/DownloadsTest.java @@ -26,12 +26,12 @@ void delete() throws Exception { Downloads task = Downloads.builder() .id(DownloadsTest.class.getSimpleName()) .type(Downloads.class.getName()) - .bucket(this.BUCKET) - .endpointOverride(localstack.getEndpointOverride(LocalStackContainer.Service.S3).toString()) - .accessKeyId(localstack.getAccessKey()) - .secretKeyId(localstack.getSecretKey()) + .bucket(Property.of(this.BUCKET)) + .endpointOverride(Property.of(localstack.getEndpointOverride(LocalStackContainer.Service.S3).toString())) + .accessKeyId(Property.of(localstack.getAccessKey())) + .secretKeyId(Property.of(localstack.getSecretKey())) .region(Property.of(localstack.getRegion())) - .action(ActionInterface.Action.DELETE) + .action(Property.of(ActionInterface.Action.DELETE)) .build(); Downloads.Output run = task.run(runContext(task)); @@ -55,14 +55,14 @@ void move() throws Exception { Downloads task = Downloads.builder() .id(DownloadsTest.class.getSimpleName()) .type(Downloads.class.getName()) - .bucket("{{bucket}}") - .endpointOverride(localstack.getEndpointOverride(LocalStackContainer.Service.S3).toString()) - .accessKeyId(localstack.getAccessKey()) - .secretKeyId(localstack.getSecretKey()) + .bucket(new Property<>("{{bucket}}")) + .endpointOverride(Property.of(localstack.getEndpointOverride(LocalStackContainer.Service.S3).toString())) + .accessKeyId(Property.of(localstack.getAccessKey())) + .secretKeyId(Property.of(localstack.getSecretKey())) .region(Property.of(localstack.getRegion())) - .action(ActionInterface.Action.MOVE) + .action(Property.of(ActionInterface.Action.MOVE)) .moveTo(Copy.CopyObject.builder() - .key("/tasks/s3-move") + .key(Property.of("/tasks/s3-move")) .build() ) .build(); @@ -72,11 +72,11 @@ void move() throws Exception { assertThat(run.getObjects().size(), is(2)); assertThat(run.getOutputFiles().size(), is(2)); - List list = list().prefix("/tasks/s3-from").build(); + List list = list().prefix(Property.of("/tasks/s3-from")).build(); List.Output listOutput = list.run(runContext(list)); assertThat(listOutput.getObjects().size(), is(0)); - list = list().prefix("/tasks/s3-move").build(); + list = list().prefix(Property.of("/tasks/s3-move")).build(); listOutput = list.run(runContext(list)); assertThat(listOutput.getObjects().size(), is(2)); } diff --git a/src/test/java/io/kestra/plugin/aws/s3/ListTest.java b/src/test/java/io/kestra/plugin/aws/s3/ListTest.java index e617bf8f..4af39d71 100644 --- a/src/test/java/io/kestra/plugin/aws/s3/ListTest.java +++ b/src/test/java/io/kestra/plugin/aws/s3/ListTest.java @@ -1,5 +1,6 @@ package io.kestra.plugin.aws.s3; +import io.kestra.core.models.property.Property; import io.kestra.core.serializers.JacksonMapper; import io.kestra.core.utils.IdUtils; import org.apache.commons.lang3.StringUtils; @@ -41,22 +42,22 @@ void run() throws Exception { // Dir listing task = list() - .filter(ListInterface.Filter.FILES) - .prefix("/tasks/s3/" + dir + "/sub") + .filter(Property.of(ListInterface.Filter.FILES)) + .prefix(Property.of("/tasks/s3/" + dir + "/sub")) .build(); run = task.run(runContext(task)); assertThat(run.getObjects().size(), is(1)); // prefix task = list() - .prefix("/tasks/s3/" + dir + "/sub") + .prefix(Property.of("/tasks/s3/" + dir + "/sub")) .build(); run = task.run(runContext(task)); assertThat(run.getObjects().size(), is(1)); // regexp task = list() - .regexp("/tasks/s3/.*/" + StringUtils.substringAfterLast(lastFileName, "/")) + .regexp(Property.of("/tasks/s3/.*/" + StringUtils.substringAfterLast(lastFileName, "/"))) .build(); run = task.run(runContext(task)); assertThat(run.getObjects().size(), is(1)); diff --git a/src/test/java/io/kestra/plugin/aws/s3/TriggerTest.java b/src/test/java/io/kestra/plugin/aws/s3/TriggerTest.java index efa7ea3c..dcccafd8 100644 --- a/src/test/java/io/kestra/plugin/aws/s3/TriggerTest.java +++ b/src/test/java/io/kestra/plugin/aws/s3/TriggerTest.java @@ -1,6 +1,7 @@ package io.kestra.plugin.aws.s3; import io.kestra.core.models.executions.Execution; +import io.kestra.core.models.property.Property; import io.kestra.core.queues.QueueFactoryInterface; import io.kestra.core.queues.QueueInterface; import io.kestra.core.repositories.LocalFlowRepositoryLoader; @@ -43,7 +44,7 @@ class TriggerTest extends AbstractTest { void deleteAction() throws Exception { String bucket = "trigger-test"; this.createBucket(bucket); - List listTask = list().bucket(bucket).build(); + List listTask = list().bucket(Property.of(bucket)).build(); // mock flow listeners CountDownLatch queueCount = new CountDownLatch(1); @@ -100,7 +101,7 @@ void deleteAction() throws Exception { void noneAction() throws Exception { String bucket = "trigger-none-action-test"; this.createBucket(bucket); - List listTask = list().bucket(bucket).build(); + List listTask = list().bucket(Property.of(bucket)).build(); // wait for execution CountDownLatch queueCount = new CountDownLatch(1); diff --git a/src/test/java/io/kestra/plugin/aws/s3/UploadsTest.java b/src/test/java/io/kestra/plugin/aws/s3/UploadsTest.java index 00ed744e..808e5f02 100644 --- a/src/test/java/io/kestra/plugin/aws/s3/UploadsTest.java +++ b/src/test/java/io/kestra/plugin/aws/s3/UploadsTest.java @@ -24,13 +24,13 @@ void run() throws Exception { Upload upload = Upload.builder() .id(AllTest.class.getSimpleName()) .type(Upload.class.getName()) - .bucket(this.BUCKET) - .endpointOverride(localstack.getEndpointOverride(LocalStackContainer.Service.S3).toString()) - .accessKeyId(localstack.getAccessKey()) - .secretKeyId(localstack.getSecretKey()) + .bucket(Property.of(this.BUCKET)) + .endpointOverride(Property.of(localstack.getEndpointOverride(LocalStackContainer.Service.S3).toString())) + .accessKeyId(Property.of(localstack.getAccessKey())) + .secretKeyId(Property.of(localstack.getSecretKey())) .region(Property.of(localstack.getRegion())) .from(java.util.List.of(source1.toString(), source2.toString(), source3.toString(), source4.toString())) - .key(IdUtils.create() + "/") + .key(Property.of(IdUtils.create() + "/")) .build(); upload.run(runContext(upload)); @@ -38,10 +38,10 @@ void run() throws Exception { List list = List.builder() .id(UploadsTest.class.getSimpleName()) .type(Upload.class.getName()) - .bucket(this.BUCKET) - .endpointOverride(localstack.getEndpointOverride(LocalStackContainer.Service.S3).toString()) - .accessKeyId(localstack.getAccessKey()) - .secretKeyId(localstack.getSecretKey()) + .bucket(Property.of(this.BUCKET)) + .endpointOverride(Property.of(localstack.getEndpointOverride(LocalStackContainer.Service.S3).toString())) + .accessKeyId(Property.of(localstack.getAccessKey())) + .secretKeyId(Property.of(localstack.getSecretKey())) .region(Property.of(localstack.getRegion())) .prefix(upload.getKey()) .build(); diff --git a/src/test/java/io/kestra/plugin/aws/sns/PublishTest.java b/src/test/java/io/kestra/plugin/aws/sns/PublishTest.java index d0918f72..7388b4e2 100644 --- a/src/test/java/io/kestra/plugin/aws/sns/PublishTest.java +++ b/src/test/java/io/kestra/plugin/aws/sns/PublishTest.java @@ -16,11 +16,11 @@ void run() throws Exception { var runContext = runContextFactory.of(); var publish = Publish.builder() - .endpointOverride(localstack.getEndpointOverride(LocalStackContainer.Service.SNS).toString()) - .topicArn(TOPIC_ARN) + .endpointOverride(Property.of(localstack.getEndpointOverride(LocalStackContainer.Service.SNS).toString())) + .topicArn(Property.of(TOPIC_ARN)) .region(Property.of(localstack.getRegion())) - .accessKeyId(localstack.getAccessKey()) - .secretKeyId(localstack.getSecretKey()) + .accessKeyId(Property.of(localstack.getAccessKey())) + .secretKeyId(Property.of(localstack.getSecretKey())) .from( List.of( Message.builder().data("Hello World").build(), diff --git a/src/test/java/io/kestra/plugin/aws/sqs/AbstractSqsTest.java b/src/test/java/io/kestra/plugin/aws/sqs/AbstractSqsTest.java index 4eb127cd..ff70d62e 100644 --- a/src/test/java/io/kestra/plugin/aws/sqs/AbstractSqsTest.java +++ b/src/test/java/io/kestra/plugin/aws/sqs/AbstractSqsTest.java @@ -1,5 +1,6 @@ package io.kestra.plugin.aws.sqs; +import io.kestra.core.models.property.Property; import io.kestra.core.runners.RunContextFactory; import io.kestra.plugin.aws.AbstractLocalStackTest; import io.kestra.core.junit.annotations.KestraTest; diff --git a/src/test/java/io/kestra/plugin/aws/sqs/PublishThenConsumeTest.java b/src/test/java/io/kestra/plugin/aws/sqs/PublishThenConsumeTest.java index 6dcb229b..89099e14 100644 --- a/src/test/java/io/kestra/plugin/aws/sqs/PublishThenConsumeTest.java +++ b/src/test/java/io/kestra/plugin/aws/sqs/PublishThenConsumeTest.java @@ -17,11 +17,11 @@ void runText() throws Exception { var runContext = runContextFactory.of(); var publish = Publish.builder() - .endpointOverride(localstack.getEndpointOverride(LocalStackContainer.Service.SQS).toString()) - .queueUrl(queueUrl()) + .endpointOverride(Property.of(localstack.getEndpointOverride(LocalStackContainer.Service.SQS).toString())) + .queueUrl(Property.of(queueUrl())) .region(Property.of(localstack.getRegion())) - .accessKeyId(localstack.getAccessKey()) - .secretKeyId(localstack.getSecretKey()) + .accessKeyId(Property.of(localstack.getAccessKey())) + .secretKeyId(Property.of(localstack.getSecretKey())) .from( List.of( Message.builder().data("Hello World").build(), @@ -34,12 +34,12 @@ void runText() throws Exception { assertThat(publishOutput.getMessagesCount(), is(2)); var consume = Consume.builder() - .endpointOverride(localstack.getEndpointOverride(LocalStackContainer.Service.SQS).toString()) - .queueUrl(queueUrl()) + .endpointOverride(Property.of(localstack.getEndpointOverride(LocalStackContainer.Service.SQS).toString())) + .queueUrl(Property.of(queueUrl())) .region(Property.of(localstack.getRegion())) - .accessKeyId(localstack.getAccessKey()) - .secretKeyId(localstack.getSecretKey()) - .maxRecords(2) + .accessKeyId(Property.of(localstack.getAccessKey())) + .secretKeyId(Property.of(localstack.getSecretKey())) + .maxRecords(Property.of(2)) .build(); var consumeOutput = consume.run(runContextFactory.of()); @@ -51,11 +51,11 @@ void runJson() throws Exception { var runContext = runContextFactory.of(); var publish = Publish.builder() - .endpointOverride(localstack.getEndpointOverride(LocalStackContainer.Service.SQS).toString()) - .queueUrl(queueUrl()) + .endpointOverride(Property.of(localstack.getEndpointOverride(LocalStackContainer.Service.SQS).toString())) + .queueUrl(Property.of(queueUrl())) .region(Property.of(localstack.getRegion())) - .accessKeyId(localstack.getAccessKey()) - .secretKeyId(localstack.getSecretKey()) + .accessKeyId(Property.of(localstack.getAccessKey())) + .secretKeyId(Property.of(localstack.getSecretKey())) .from( List.of( Message.builder().data(""" @@ -70,13 +70,13 @@ void runJson() throws Exception { assertThat(publishOutput.getMessagesCount(), is(2)); var consume = Consume.builder() - .endpointOverride(localstack.getEndpointOverride(LocalStackContainer.Service.SQS).toString()) - .queueUrl(queueUrl()) + .endpointOverride(Property.of(localstack.getEndpointOverride(LocalStackContainer.Service.SQS).toString())) + .queueUrl(Property.of(queueUrl())) .region(Property.of(localstack.getRegion())) - .accessKeyId(localstack.getAccessKey()) - .secretKeyId(localstack.getSecretKey()) - .serdeType(SerdeType.JSON) - .maxRecords(2) + .accessKeyId(Property.of(localstack.getAccessKey())) + .secretKeyId(Property.of(localstack.getSecretKey())) + .serdeType(Property.of(SerdeType.JSON)) + .maxRecords(Property.of(2)) .build(); var consumeOutput = consume.run(runContextFactory.of()); diff --git a/src/test/java/io/kestra/plugin/aws/sqs/RealtimeTriggerTest.java b/src/test/java/io/kestra/plugin/aws/sqs/RealtimeTriggerTest.java index 859858e0..a863eaad 100644 --- a/src/test/java/io/kestra/plugin/aws/sqs/RealtimeTriggerTest.java +++ b/src/test/java/io/kestra/plugin/aws/sqs/RealtimeTriggerTest.java @@ -71,11 +71,11 @@ void flow() throws Exception { // publish two messages to trigger the flow Publish task = Publish.builder() - .endpointOverride(localstack.getEndpointOverride(LocalStackContainer.Service.SQS).toString()) - .queueUrl(queueUrl()) + .endpointOverride(Property.of(localstack.getEndpointOverride(LocalStackContainer.Service.SQS).toString())) + .queueUrl(Property.of(queueUrl())) .region(Property.of(localstack.getRegion())) - .accessKeyId(localstack.getAccessKey()) - .secretKeyId(localstack.getSecretKey()) + .accessKeyId(Property.of(localstack.getAccessKey())) + .secretKeyId(Property.of(localstack.getSecretKey())) .from( List.of( Message.builder().data("Hello World").build() diff --git a/src/test/java/io/kestra/plugin/aws/sqs/TriggerTest.java b/src/test/java/io/kestra/plugin/aws/sqs/TriggerTest.java index c9eded42..fd34fff2 100644 --- a/src/test/java/io/kestra/plugin/aws/sqs/TriggerTest.java +++ b/src/test/java/io/kestra/plugin/aws/sqs/TriggerTest.java @@ -73,11 +73,11 @@ void flow() throws Exception { // publish two messages to trigger the flow Publish task = Publish.builder() - .endpointOverride(localstack.getEndpointOverride(LocalStackContainer.Service.SQS).toString()) - .queueUrl(queueUrl()) + .endpointOverride(Property.of(localstack.getEndpointOverride(LocalStackContainer.Service.SQS).toString())) + .queueUrl(Property.of(queueUrl())) .region(Property.of(localstack.getRegion())) - .accessKeyId(localstack.getAccessKey()) - .secretKeyId(localstack.getSecretKey()) + .accessKeyId(Property.of(localstack.getAccessKey())) + .secretKeyId(Property.of(localstack.getSecretKey())) .from( List.of( Message.builder().data("Hello World").build(),