Skip to content

Commit

Permalink
Merge branch 'kestra-io:master' into master
Browse files Browse the repository at this point in the history
  • Loading branch information
aballiet authored Sep 9, 2023
2 parents 3d81373 + 3d7e50e commit 024dea3
Show file tree
Hide file tree
Showing 18 changed files with 591 additions and 344 deletions.
4 changes: 2 additions & 2 deletions gradle.properties
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
version=0.11.0-SNAPSHOT
kestraVersion=0.11.+
version=0.12.0-SNAPSHOT
kestraVersion=[0.12,)
micronautVersion=3.9.3
lombokVersion=1.18.28
28 changes: 19 additions & 9 deletions src/main/java/io/kestra/plugin/gcp/cli/GCloudCLI.java
Original file line number Diff line number Diff line change
Expand Up @@ -66,30 +66,32 @@
}
)
public class GCloudCLI extends Task implements RunnableTask<ScriptOutput> {
private static final String DEFAULT_IMAGE = "google/cloud-sdk";

@NotNull
@NotEmpty
@Schema(
title = "The full service account JSON key to use to authenticate to gcloud"
title = "The full service account JSON key to use to authenticate to gcloud"
)
@PluginProperty(dynamic = true)
protected String serviceAccount;

@Schema(
title = "The project id to scope the commands to"
title = "The project id to scope the commands to"
)
@PluginProperty(dynamic = true)
protected String projectId;

@Schema(
title = "The commands to run"
title = "The commands to run"
)
@PluginProperty(dynamic = true)
@NotNull
@NotEmpty
protected List<String> commands;

@Schema(
title = "Additional environment variables for the current process."
title = "Additional environment variables for the current process."
)
@PluginProperty(
additionalProperties = String.class,
Expand All @@ -98,21 +100,20 @@ public class GCloudCLI extends Task implements RunnableTask<ScriptOutput> {
protected Map<String, String> env;

@Schema(
title = "Docker options when for the `DOCKER` runner"
title = "Docker options when for the `DOCKER` runner",
defaultValue = "{image=" + DEFAULT_IMAGE + ", pullPolicy=ALWAYS}"
)
@PluginProperty
@Builder.Default
protected DockerOptions docker = DockerOptions.builder()
.image("google/cloud-sdk")
.build();
protected DockerOptions docker = DockerOptions.builder().build();

@Override
public ScriptOutput run(RunContext runContext) throws Exception {

CommandsWrapper commands = new CommandsWrapper(runContext)
.withWarningOnStdErr(true)
.withRunnerType(RunnerType.DOCKER)
.withDockerOptions(this.docker)
.withDockerOptions(injectDefaults(getDocker()))
.withCommands(
ScriptService.scriptCommands(
List.of("/bin/sh", "-c"),
Expand All @@ -125,6 +126,15 @@ public ScriptOutput run(RunContext runContext) throws Exception {
return commands.run();
}

private DockerOptions injectDefaults(DockerOptions original) {
var builder = original.toBuilder();
if (original.getImage() == null) {
builder.image(DEFAULT_IMAGE);
}

return builder.build();
}

private Map<String, String> getEnv(RunContext runContext) throws IOException, IllegalVariableEvaluationException {
Map<String, String> envs = new HashMap<>();
if (serviceAccount != null) {
Expand Down
21 changes: 16 additions & 5 deletions src/main/java/io/kestra/plugin/gcp/pubsub/Consume.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package io.kestra.plugin.gcp.pubsub;

import com.google.api.gax.core.FixedCredentialsProvider;
import com.google.cloud.pubsub.v1.MessageReceiver;
import com.google.cloud.pubsub.v1.Subscriber;
import io.kestra.core.models.annotations.Example;
Expand All @@ -10,6 +11,7 @@
import io.kestra.core.runners.RunContext;
import io.kestra.core.serializers.FileSerde;
import io.kestra.plugin.gcp.pubsub.model.Message;
import io.kestra.plugin.gcp.pubsub.model.SerdeType;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.*;
import lombok.experimental.SuperBuilder;
Expand All @@ -22,6 +24,8 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;

import javax.validation.constraints.NotNull;

import static io.kestra.core.utils.Rethrow.throwRunnable;

@SuperBuilder
Expand All @@ -31,7 +35,7 @@
@NoArgsConstructor
@Schema(
title = "Consume messages from a Pub/Sub topic.",
description = "Required a maxDuration or a maxRecords."
description = "Requires a maxDuration or a maxRecords."
)
@Plugin(
examples = {
Expand All @@ -46,14 +50,14 @@
public class Consume extends AbstractPubSub implements RunnableTask<Consume.Output> {

@Schema(
title = "The Pub/Sub subscription",
title = "The Pub/Sub subscription.",
description = "The Pub/Sub subscription. It will be created automatically if it didn't exist and 'autoCreateSubscription' is enabled."
)
@PluginProperty(dynamic = true)
private String subscription;

@Schema(
title = "Whether the Pub/Sub subscription should be created if not exist"
title = "Whether the Pub/Sub subscription should be created if not exists."
)
@PluginProperty
@Builder.Default
Expand All @@ -67,6 +71,11 @@ public class Consume extends AbstractPubSub implements RunnableTask<Consume.Outp
@Schema(title = "Max duration in the Duration ISO format, after that the task will end.")
private Duration maxDuration;

@Builder.Default
@PluginProperty
@NotNull
@Schema(title = "The serializer/deserializer to use.")
private SerdeType serdeType = SerdeType.STRING;

@Override
public Output run(RunContext runContext) throws Exception {
Expand All @@ -83,7 +92,7 @@ public Output run(RunContext runContext) throws Exception {
AtomicReference<Exception> threadException = new AtomicReference<>();
MessageReceiver receiver = (message, consumer) -> {
try {
FileSerde.write(outputFile, Message.of(message));
FileSerde.write(outputFile, Message.of(message, serdeType));
total.getAndIncrement();
consumer.ack();
}
Expand All @@ -92,7 +101,9 @@ public Output run(RunContext runContext) throws Exception {
consumer.nack();
}
};
var subscriber = Subscriber.newBuilder(subscriptionName, receiver).build();
var subscriber = Subscriber.newBuilder(subscriptionName, receiver)
.setCredentialsProvider(FixedCredentialsProvider.create(this.credentials(runContext)))
.build();
subscriber.startAsync().awaitRunning();

while (!this.ended(total, started)) {
Expand Down
11 changes: 9 additions & 2 deletions src/main/java/io/kestra/plugin/gcp/pubsub/Publish.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import io.kestra.core.serializers.FileSerde;
import io.kestra.core.serializers.JacksonMapper;
import io.kestra.plugin.gcp.pubsub.model.Message;
import io.kestra.plugin.gcp.pubsub.model.SerdeType;
import io.reactivex.BackpressureStrategy;
import io.reactivex.Flowable;
import io.swagger.v3.oas.annotations.media.Schema;
Expand Down Expand Up @@ -58,6 +59,12 @@ public class Publish extends AbstractPubSub implements RunnableTask<Publish.Outp
)
private Object from;

@Builder.Default
@PluginProperty
@NotNull
@Schema(title = "The serializer/deserializer to use.")
private SerdeType serdeType = SerdeType.STRING;

@Override
public Publish.Output run(RunContext runContext) throws Exception {
var publisher = this.createPublisher(runContext);
Expand Down Expand Up @@ -88,7 +95,7 @@ public Publish.Output run(RunContext runContext) throws Exception {
count = resultFlowable.reduce(Integer::sum).blockingGet();
} else {
var msg = JacksonMapper.toMap(this.from, Message.class);
publisher.publish(msg.to(runContext));
publisher.publish(msg.to(runContext, this.serdeType));
}

publisher.shutdown();
Expand All @@ -104,7 +111,7 @@ public Publish.Output run(RunContext runContext) throws Exception {
private Flowable<Integer> buildFlowable(Flowable<Message> flowable, Publisher publisher, RunContext runContext) {
return flowable
.map(message -> {
publisher.publish(message.to(runContext));
publisher.publish(message.to(runContext, this.serdeType));
return 1;
});
}
Expand Down
9 changes: 9 additions & 0 deletions src/main/java/io/kestra/plugin/gcp/pubsub/Trigger.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import io.kestra.core.models.triggers.TriggerOutput;
import io.kestra.core.runners.RunContext;
import io.kestra.core.utils.IdUtils;
import io.kestra.plugin.gcp.pubsub.model.SerdeType;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.*;
import lombok.experimental.SuperBuilder;
Expand All @@ -22,6 +23,7 @@
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import javax.validation.constraints.NotNull;


@SuperBuilder
Expand Down Expand Up @@ -78,6 +80,12 @@ public class Trigger extends AbstractTrigger implements PollingTriggerInterface,
@Schema(title = "Max duration in the Duration ISO format, after that the task will end.")
private Duration maxDuration;

@Builder.Default
@PluginProperty
@NotNull
@Schema(title = "The serializer/deserializer to use.")
private SerdeType serdeType = SerdeType.STRING;

@Override
public Optional<Execution> evaluate(ConditionContext conditionContext, TriggerContext context) throws Exception {
RunContext runContext = conditionContext.getRunContext();
Expand All @@ -92,6 +100,7 @@ public Optional<Execution> evaluate(ConditionContext conditionContext, TriggerCo
.scopes(this.scopes)
.maxRecords(this.maxRecords)
.maxDuration(this.maxDuration)
.serdeType(this.serdeType)
.build();

Consume.Output run = task.run(runContext);
Expand Down
37 changes: 27 additions & 10 deletions src/main/java/io/kestra/plugin/gcp/pubsub/model/Message.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
import lombok.Getter;
import lombok.extern.jackson.Jacksonized;

import java.io.IOException;
import java.util.Base64;
import java.util.Map;

import static io.kestra.core.utils.Rethrow.throwBiConsumer;
Expand All @@ -19,11 +21,14 @@
@Jacksonized
public class Message {

@Schema(title = "The message data, must be base64 encoded")
@Schema(
title = "The message data, must be a string if serde type is 'STRING', otherwise a JSON object",
description = "If it's a string, it can be a dynamic property otherwise not."
)
@PluginProperty(dynamic = true)
private String data;
private Object data;

@Schema(title = "The message attribute map")
@Schema(title = "The message attributes map")
@PluginProperty(dynamic = true)
private Map<String, String> attributes;

Expand All @@ -35,10 +40,17 @@ public class Message {
@PluginProperty(dynamic = true)
private String orderingKey;

public PubsubMessage to(RunContext runContext) throws IllegalVariableEvaluationException {
public PubsubMessage to(RunContext runContext, SerdeType serdeType) throws IllegalVariableEvaluationException, IOException {
var builder = PubsubMessage.newBuilder();
if(data != null) {
builder.setData(ByteString.copyFrom(runContext.render(data).getBytes()));
byte[] serializedData;
if (data instanceof String dataStr) {
var rendered = runContext.render(dataStr);
serializedData = rendered.getBytes();
} else {
serializedData = serdeType.serialize(data);
}
builder.setData(ByteString.copyFrom(Base64.getEncoder().encode(serializedData)));
}
if(attributes != null && !attributes.isEmpty()) {
attributes.forEach(throwBiConsumer((key, value) -> builder.putAttributes(runContext.render(key), runContext.render(value))));
Expand All @@ -52,12 +64,17 @@ public PubsubMessage to(RunContext runContext) throws IllegalVariableEvaluationE
return builder.build();
}

public static Message of(PubsubMessage message) {
return Message.builder()
public static Message of(PubsubMessage message, SerdeType serdeType) throws IOException {
var builder = Message.builder()
.messageId(message.getMessageId())
.data(message.getData().toString())
.attributes(message.getAttributesMap())
.orderingKey(message.getOrderingKey())
.build();
.orderingKey(message.getOrderingKey());

if (message.getData() != null) {
var decodedData = Base64.getDecoder().decode(message.getData().toByteArray());
builder.data(serdeType.deserialize(decodedData));
}

return builder.build();
}
}
29 changes: 29 additions & 0 deletions src/main/java/io/kestra/plugin/gcp/pubsub/model/SerdeType.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package io.kestra.plugin.gcp.pubsub.model;

import com.fasterxml.jackson.databind.ObjectMapper;
import io.kestra.core.serializers.JacksonMapper;

import java.io.IOException;

public enum SerdeType {
STRING,
JSON;

private static final ObjectMapper OBJECT_MAPPER = JacksonMapper.ofJson(false);

public Object deserialize(byte[] message) throws IOException {
if (this == SerdeType.JSON) {
return OBJECT_MAPPER.readValue(message, Object.class);
} else {
return message;
}
}

public byte[] serialize(Object message) throws IOException {
if (this == SerdeType.JSON) {
return OBJECT_MAPPER.writeValueAsBytes(message);
} else {
return message.toString().getBytes();
}
}
}
Loading

0 comments on commit 024dea3

Please sign in to comment.