Skip to content

Commit

Permalink
fix: minor cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
fhussonnois committed Jul 25, 2024
1 parent 455c396 commit 7877b2f
Show file tree
Hide file tree
Showing 7 changed files with 22 additions and 52 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -44,24 +44,20 @@ public abstract class Transform extends Task {
@Builder.Default
private boolean breakOnFirstMatch = true;

@Getter(AccessLevel.PRIVATE)
private RunContext runContext;

@Getter(AccessLevel.PRIVATE)
private GrokPatternCompiler compiler;

@Getter(AccessLevel.PRIVATE)
private List<GrokMatcher> grokMatchers;

public void init(final RunContext runContext) {
this.runContext = runContext;

// create compiler
this.compiler = new GrokPatternCompiler(
new GrokPatternResolver(
runContext.logger(),
patternDefinitions(),
patternsDir()
patternsDir(runContext)
),
isNamedCapturesOnly()
);
Expand Down Expand Up @@ -92,7 +88,7 @@ private Map<String, String> patternDefinitions() {
return Optional.ofNullable(patternDefinitions).orElse(Collections.emptyMap());
}

private List<File> patternsDir() {
private List<File> patternsDir(RunContext runContext) {
if (this.patternsDir == null || this.patternsDir.isEmpty()) return Collections.emptyList();

return this.patternsDir
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,13 @@
import lombok.experimental.SuperBuilder;
import reactor.core.publisher.Flux;

import java.io.BufferedWriter;
import java.io.InputStream;
import java.net.URI;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Map;

import static io.kestra.core.utils.Rethrow.throwFunction;

@SuperBuilder
@ToString
@EqualsAndHashCode
Expand Down Expand Up @@ -102,19 +99,15 @@ public Output run(RunContext runContext) throws Exception {
Flux<String> flux = FileSerde.readAll(is, new TypeReference<String>() {
});
final Path ouputFilePath = runContext.workingDir().createTempFile(".ion");
try (final BufferedWriter writer = Files.newBufferedWriter(ouputFilePath)) {
Long processedItemsTotal = flux.map(throwFunction(data -> {
Map<String, Object> captured = matches(data.getBytes(StandardCharsets.UTF_8));
writer.write(ION_OBJECT_MAPPER.writeValueAsString(captured));
writer.newLine();
return 1L;
}))
.reduce(Long::sum)
.block();
try {
// transform
Flux<Map<String, Object>> values = flux.map(data -> matches(data.getBytes(StandardCharsets.UTF_8)));

writer.flush();
Long processedItemsTotal = FileSerde.writeAll(Files.newOutputStream(ouputFilePath), values).block();

URI uri = runContext.storage().putFile(ouputFilePath.toFile());

// output
return Output
.builder()
.uri(uri)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@
import io.swagger.v3.oas.annotations.media.Schema;
import jakarta.validation.constraints.NotNull;

import java.time.Duration;

public interface JSONataInterface {

@PluginProperty(dynamic = true)
Expand All @@ -17,9 +15,4 @@ public interface JSONataInterface {
@Schema(title = "The maximum number of recursive calls allowed for the JSONata transformation.")
@NotNull
Integer getMaxDepth();

@PluginProperty(dynamic = true)
@Schema(title = "The maximum duration allowed for the evaluation to occur. If it takes longer the task will fail.")
@NotNull
Duration getTimeout();
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import java.io.IOException;
import java.time.Duration;
import java.util.Optional;

@SuperBuilder
@ToString
Expand All @@ -36,23 +37,17 @@ public abstract class Transform extends Task implements JSONataInterface, Runnab
@Builder.Default
private Integer maxDepth = 1000;

@Builder.Default
private Duration timeout = Duration.ofSeconds(10);

@Getter(AccessLevel.PRIVATE)
private RunContext runContext;

@Getter(AccessLevel.PRIVATE)
private Expressions expressions;

public void init(RunContext runContext) throws Exception {
this.runContext = runContext;
this.expressions = parseExpression(runContext);
}

protected JsonNode evaluateExpression(JsonNode jsonNode) {
try {
return this.expressions.evaluate(jsonNode, getTimeout().toMillis(), getMaxDepth());
long timeoutInMilli = Optional.ofNullable(getTimeout()).map(Duration::toMillis).orElse(Long.MAX_VALUE);
return this.expressions.evaluate(jsonNode, timeoutInMilli, getMaxDepth());
} catch (EvaluateException e) {
throw new RuntimeException("Failed to evaluate expression", e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,11 @@
import lombok.experimental.SuperBuilder;
import reactor.core.publisher.Flux;

import java.io.BufferedWriter;
import java.io.InputStream;
import java.net.URI;
import java.nio.file.Files;
import java.nio.file.Path;

import static io.kestra.core.utils.Rethrow.throwFunction;

@SuperBuilder
@ToString
@EqualsAndHashCode
Expand Down Expand Up @@ -86,19 +83,13 @@ public Output run(RunContext runContext) throws Exception {
Flux<JsonNode> flux = FileSerde.readAll(is, new TypeReference<JsonNode>() {
});
final Path ouputFilePath = runContext.workingDir().createTempFile(".ion");
try (final BufferedWriter writer = Files.newBufferedWriter(ouputFilePath)) {
try {

// transform
Long processedItemsTotal = flux.map(throwFunction(jsonNode -> {
jsonNode = evaluateExpression(jsonNode);
writer.write(ION_OBJECT_MAPPER.writeValueAsString(jsonNode));
writer.newLine();
return 1L;
}))
.reduce(Long::sum)
.block();
Flux<JsonNode> values = flux.map(this::evaluateExpression);

Long processedItemsTotal = FileSerde.writeAll(Files.newOutputStream(ouputFilePath), values).block();

writer.flush();
URI uri = runContext.storage().putFile(ouputFilePath.toFile());

// output
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,7 @@
)
public class TransformValue extends Transform implements RunnableTask<Output> {

private static final ObjectMapper ION_OBJECT_MAPPER = JacksonMapper.ofIon();
public static final ObjectMapper DEFAULT_OBJECT_MAPPER = new ObjectMapper();
public static final ObjectMapper JSON_OBJECT_MAPPER = JacksonMapper.ofJson();

@Schema(
title = "The value to be transformed.",
Expand Down Expand Up @@ -106,7 +105,7 @@ public Output run(RunContext runContext) throws Exception {

private static JsonNode parseJson(String from) {
try {
return DEFAULT_OBJECT_MAPPER.readTree(from);
return JSON_OBJECT_MAPPER.readTree(from);
} catch (JsonProcessingException e) {
throw new IllegalArgumentException("Failed to parse the JSON object passed through the `from` property.", e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import jakarta.inject.Inject;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import java.io.InputStream;
Expand All @@ -30,7 +31,9 @@ void shouldGetOutputForValidExprReturningStringForFromURI() throws Exception {
RunContext runContext = runContextFactory.of();
final Path ouputFilePath = runContext.workingDir().createTempFile(".ion");
try (final OutputStream os = Files.newOutputStream(ouputFilePath)) {
FileSerde.writeAll(os, Mono.just(new ObjectMapper().readValue(Features.DATASET_ACCOUNT_ORDER_JSON, Map.class)).flux()).block();
FileSerde.writeAll(os, Flux.just(
new ObjectMapper().readValue(Features.DATASET_ACCOUNT_ORDER_JSON, Map.class),
new ObjectMapper().readValue(Features.DATASET_ACCOUNT_ORDER_JSON, Map.class))).block();
os.flush();
}
URI uri = runContext.storage().putFile(ouputFilePath.toFile());
Expand All @@ -45,7 +48,7 @@ void shouldGetOutputForValidExprReturningStringForFromURI() throws Exception {

// Then
Assertions.assertNotNull(output);
Assertions.assertEquals(1, output.getProcessedItemsTotal());
Assertions.assertEquals(2, output.getProcessedItemsTotal());

InputStream is = runContext.storage().getFile(output.getUri());
String transformationResult = FileSerde.readAll(is, new TypeReference<String>() {
Expand Down

0 comments on commit 7877b2f

Please sign in to comment.