Skip to content

Commit

Permalink
fix: refactor tasks to TransformItems and TransformValue
Browse files Browse the repository at this point in the history
  • Loading branch information
fhussonnois committed Jul 25, 2024
1 parent 216de9f commit 455c396
Show file tree
Hide file tree
Showing 60 changed files with 1,094 additions and 674 deletions.
4 changes: 2 additions & 2 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ subprojects { Project subproject ->

github {
user 'kestra-io'
repository 'plugin-transforms'
repository 'plugin-transform'
license 'Apache'
}

Expand Down Expand Up @@ -239,7 +239,7 @@ release {
tagTemplate = 'v${version}'
buildTasks = ['classes']
git {
requireBranch.set('master')
requireBranch.set('main')
}
}

Expand Down
File renamed without changes.
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package io.kestra.plugin.transform.grok;

import io.kestra.core.models.annotations.PluginProperty;
import io.swagger.v3.oas.annotations.media.Schema;
import jakarta.validation.constraints.NotNull;

import java.util.List;
import java.util.Map;

public interface GrokInterface {

@PluginProperty
@Schema(title = "The Grok pattern to match.")
String getPattern();

@PluginProperty
@Schema(title = "The list of Grok patterns to match.")
List<String> getPatterns();

@PluginProperty
@Schema(
title = "List of user-defined pattern directories.",
description = "Directories must be paths relative to the working directory."
)
List<String> getPatternsDir();

@PluginProperty
@Schema(
title = "Custom pattern definitions.",
description = "A map of pattern-name and pattern pairs defining custom patterns to be used by the current tasks. Patterns matching existing names will override the pre-existing definition. "
)
Map<String, String> getPatternDefinitions();

@PluginProperty
@Schema(title = "If `true`, only store named captures from grok.")
boolean isNamedCapturesOnly();

@PluginProperty
@Schema(
title = "If `true`, break on first match.",
description = "The first successful match by grok will result in the task being finished. Set to `false` if you want the task to try all configured patterns."
)
boolean isBreakOnFirstMatch();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
package io.kestra.plugin.transform.grok;

import io.kestra.core.models.tasks.Task;
import io.kestra.core.runners.RunContext;
import io.kestra.plugin.transform.grok.pattern.GrokMatcher;
import io.kestra.plugin.transform.grok.pattern.GrokPatternCompiler;
import io.kestra.plugin.transform.grok.pattern.GrokPatternResolver;
import lombok.AccessLevel;
import lombok.Builder;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.ToString;
import lombok.experimental.SuperBuilder;

import java.io.File;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;

@SuperBuilder
@ToString
@EqualsAndHashCode
@Getter
@NoArgsConstructor
public abstract class Transform extends Task {

private String pattern;

private List<String> patterns;

private List<String> patternsDir;

private Map<String, String> patternDefinitions;

@Builder.Default
private boolean namedCapturesOnly = true;

@Builder.Default
private boolean breakOnFirstMatch = true;

@Getter(AccessLevel.PRIVATE)
private RunContext runContext;

@Getter(AccessLevel.PRIVATE)
private GrokPatternCompiler compiler;

@Getter(AccessLevel.PRIVATE)
private List<GrokMatcher> grokMatchers;

public void init(final RunContext runContext) {
this.runContext = runContext;

// create compiler
this.compiler = new GrokPatternCompiler(
new GrokPatternResolver(
runContext.logger(),
patternDefinitions(),
patternsDir()
),
isNamedCapturesOnly()
);

// compile all patterns
this.grokMatchers = patterns().stream().map(compiler::compile).toList();
}

public Map<String, Object> matches(final byte[] bytes) {
// match patterns
final List<Map<String, Object>> allNamedCaptured = new ArrayList<>(grokMatchers.size());
for (GrokMatcher matcher : grokMatchers) {
final Map<String, Object> captured = matcher.captures(bytes);
if (captured != null) {
allNamedCaptured.add(captured);
if (isBreakOnFirstMatch()) break;
}
}
// merge all named captured
Map<String, Object> mergedValues = new HashMap<>();
for (Map<String, Object> namedCaptured : allNamedCaptured) {
mergedValues.putAll(namedCaptured);
}
return mergedValues;
}

private Map<String, String> patternDefinitions() {
return Optional.ofNullable(patternDefinitions).orElse(Collections.emptyMap());
}

private List<File> patternsDir() {
if (this.patternsDir == null || this.patternsDir.isEmpty()) return Collections.emptyList();

return this.patternsDir
.stream()
.map(dir -> runContext.workingDir().resolve(Path.of(dir)))
.map(Path::toFile)
.collect(Collectors.toList());
}

private List<String> patterns() {
if (pattern != null) return List.of(pattern);

if (patterns == null || patterns.isEmpty()) {
throw new IllegalArgumentException(
"Missing required configuration, either `pattern` or `patterns` properties must not be empty.");
}
return patterns;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
package io.kestra.plugin.transform.grok;

import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.kestra.core.models.annotations.Example;
import io.kestra.core.models.annotations.Plugin;
import io.kestra.core.models.annotations.PluginProperty;
import io.kestra.core.models.tasks.Output;
import io.kestra.core.models.tasks.RunnableTask;
import io.kestra.core.runners.RunContext;
import io.kestra.core.serializers.FileSerde;
import io.kestra.core.serializers.JacksonMapper;
import io.swagger.v3.oas.annotations.media.Schema;
import jakarta.validation.constraints.NotNull;
import lombok.Builder;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.ToString;
import lombok.experimental.SuperBuilder;
import reactor.core.publisher.Flux;

import java.io.BufferedWriter;
import java.io.InputStream;
import java.net.URI;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Map;

import static io.kestra.core.utils.Rethrow.throwFunction;

@SuperBuilder
@ToString
@EqualsAndHashCode
@Getter
@NoArgsConstructor
@Schema(
title = "Parse arbitrary text and structure it using Grok expressions.",
description = """
The `TransformItems` task is similar to the famous Logstash Grok filter from the ELK stack.
It is particularly useful for transforming unstructured data such as logs into a structured, indexable, and queryable data structure.
The `TransformItems` ships with all the default patterns as defined You can find them here: https://github.com/kestra-io/plugin-transform/tree/main/plugin-transforms-grok/src/main/resources/patterns.
"""
)
@Plugin(
examples = {
@Example(
title = "Consume, parse, and structure logs events from Kafka topic.",
full = true,
code = """
id: grok
namespace: myteam
tasks:
- id: grok
type: io.kestra.plugin.transform.grok.TransformItems
pattern: "%{TIMESTAMP_ISO8601:logdate} %{LOGLEVEL:loglevel} %{GREEDYDATA:message}"
from: "{{ trigger.uri }}"
triggers:
- id: trigger
type: io.kestra.plugin.kafka.Trigger
topic: test_kestra
properties:
bootstrap.servers: localhost:9092
serdeProperties:
schema.registry.url: http://localhost:8085
keyDeserializer: STRING
valueDeserializer: STRING
groupId: kafkaConsumerGroupId
interval: PT30S
maxRecords: 5
"""
)
}
)
public class TransformItems extends Transform implements GrokInterface, RunnableTask<Output> {

private static final ObjectMapper ION_OBJECT_MAPPER = JacksonMapper.ofIon();

@Schema(
title = "The file to be transformed.",
description = "Must be a `kestra://` internal storage URI."
)
@PluginProperty(dynamic = true)
@NotNull
private String from;

/**
* {@inheritDoc}
**/
@Override
public Output run(RunContext runContext) throws Exception {
init(runContext);

String from = runContext.render(this.from);

URI objectURI = new URI(from);
try (InputStream is = runContext.storage().getFile(objectURI);) {
Flux<String> flux = FileSerde.readAll(is, new TypeReference<String>() {
});
final Path ouputFilePath = runContext.workingDir().createTempFile(".ion");
try (final BufferedWriter writer = Files.newBufferedWriter(ouputFilePath)) {
Long processedItemsTotal = flux.map(throwFunction(data -> {
Map<String, Object> captured = matches(data.getBytes(StandardCharsets.UTF_8));
writer.write(ION_OBJECT_MAPPER.writeValueAsString(captured));
writer.newLine();
return 1L;
}))
.reduce(Long::sum)
.block();

writer.flush();

URI uri = runContext.storage().putFile(ouputFilePath.toFile());
return Output
.builder()
.uri(uri)
.processedItemsTotal(processedItemsTotal)
.build();
} finally {
Files.deleteIfExists(ouputFilePath); // ensure temp file is deleted in case of error
}
}
}

@Builder
@Getter
public static class Output implements io.kestra.core.models.tasks.Output {
@Schema(
title = "The transformed file URI."
)
private final URI uri;

@Schema(
title = "The total number of items that was processed by the task."
)
private final Long processedItemsTotal;
}
}
Loading

0 comments on commit 455c396

Please sign in to comment.