Skip to content

Commit

Permalink
refactor: upgrade to v2 property
Browse files Browse the repository at this point in the history
  • Loading branch information
mgabelle committed Sep 23, 2024
1 parent 7accefa commit df7f374
Show file tree
Hide file tree
Showing 11 changed files with 146 additions and 195 deletions.
85 changes: 35 additions & 50 deletions src/main/java/io/kestra/plugin/dbt/cli/AbstractDbt.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

import com.fasterxml.jackson.annotation.JsonSetter;
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
import io.kestra.core.models.annotations.PluginProperty;
import io.kestra.core.models.property.Property;
import io.kestra.core.models.tasks.*;
import io.kestra.core.models.tasks.runners.AbstractLogConsumer;
import io.kestra.core.models.tasks.runners.ScriptService;
Expand Down Expand Up @@ -43,108 +43,92 @@ public abstract class AbstractDbt extends Task implements RunnableTask<ScriptOut
@Schema(
title = "Stop execution at the first failure."
)
@PluginProperty
Boolean failFast = false;
Property<Boolean> failFast = Property.of(false);

@Builder.Default
@Schema(
title = "When dbt would normally warn, raise an exception.",
description = "Examples include --models that selects nothing, deprecations, configurations with no " +
"associated models, invalid test configurations, and missing sources/refs in tests."
)
@PluginProperty
Boolean warnError = false;
Property<Boolean> warnError = Property.of(false);

@Builder.Default
@Schema(
title = "Display debug logging during dbt execution.",
description = "Useful for debugging and making bug reports."
)
@PluginProperty
Boolean debug = false;
Property<Boolean> debug = Property.of(false);

@Schema(
title = "Which directory to look in for the dbt_project.yml file.",
description = "Default is the current working directory and its parents."
)
@PluginProperty
String projectDir;
Property<String> projectDir;

@Builder.Default
@Schema(
title = "The path to the dbt CLI"
)
@PluginProperty(dynamic = true)
String dbtPath = "./bin/dbt";
Property<String> dbtPath = Property.of("./bin/dbt");

@Schema(
title = "The `profiles.yml` file content",
description = "If a `profile.yml` file already exist in the current working directory, it will be overridden."
)
@PluginProperty(dynamic = true)
private String profiles;
Property<String> profiles;

@Schema(
title = "The task runner to use.",
description = """
title = "The task runner to use.",
description = """
Task runners are provided by plugins, each have their own properties.
If you change from the default one, be careful to also configure the entrypoint to an empty list if needed."""
)
@PluginProperty
@Builder.Default
@Valid
protected TaskRunner taskRunner = Docker.builder()
.type(Docker.class.getName())
.entryPoint(Collections.emptyList())
.build();
protected Property<TaskRunner> taskRunner = Property.of(Docker.builder()
.type(Docker.class.getName())
.entryPoint(Collections.emptyList())
.build());

@Schema(title = "The task runner container image, only used if the task runner is container-based.")
@PluginProperty(dynamic = true)
@Builder.Default
protected String containerImage = DEFAULT_IMAGE;
protected Property<String> containerImage = Property.of(DEFAULT_IMAGE);

@Schema(
title = "The runner type.",
description = "Deprecated, use 'taskRunner' instead."
)
@Deprecated
@PluginProperty
protected RunnerType runner;
protected Property<RunnerType> runner;

@Schema(
title = "Deprecated, use 'taskRunner' instead"
)
@PluginProperty
@Deprecated
private DockerOptions docker;
private Property<DockerOptions> docker;

@Schema(title = "Deprecated, use the `docker` property instead", deprecated = true)
@PluginProperty
@Deprecated
private DockerOptions dockerOptions;
private Property<DockerOptions> dockerOptions;

@JsonSetter
public void setDockerOptions(DockerOptions dockerOptions) {
public void setDockerOptions(Property<DockerOptions> dockerOptions) {
this.dockerOptions = dockerOptions;
this.docker = dockerOptions;
}

@Schema(
title = "Additional environment variables for the current process."
)
@PluginProperty(
additionalProperties = String.class,
dynamic = true
)
protected Map<String, String> env;
protected Property<Map<String, String>> env;

@Builder.Default
@Schema(
title = "Parse run result",
description = "Parsing run result to display duration of each task inside dbt"
)
@PluginProperty
protected Boolean parseRunResults = true;
protected Property<Boolean> parseRunResults = Property.of(Boolean.TRUE);

private NamespaceFiles namespaceFiles;

Expand All @@ -157,14 +141,14 @@ public void setDockerOptions(DockerOptions dockerOptions) {
@Override
public ScriptOutput run(RunContext runContext) throws Exception {
CommandsWrapper commandsWrapper = new CommandsWrapper(runContext)
.withEnv(this.getEnv())
.withEnv(this.getEnv().asMap(runContext, String.class, String.class))
.withNamespaceFiles(namespaceFiles)
.withInputFiles(inputFiles)
.withOutputFiles(outputFiles)
.withRunnerType(this.getRunner())
.withDockerOptions(this.getDocker())
.withContainerImage(this.containerImage)
.withTaskRunner(this.taskRunner)
.withRunnerType(this.getRunner().as(runContext, RunnerType.class))
.withDockerOptions(this.getDocker().as(runContext, DockerOptions.class))
.withContainerImage(this.containerImage.as(runContext, String.class))
.withTaskRunner(this.taskRunner.as(runContext, TaskRunner.class))
.withLogConsumer(new AbstractLogConsumer() {
@Override
public void accept(String line, Boolean isStdErr) {
Expand All @@ -174,14 +158,15 @@ public void accept(String line, Boolean isStdErr) {
.withEnableOutputDirectory(true); //force output files on task runners
Path workingDirectory = commandsWrapper.getWorkingDirectory();

if (profiles != null && !profiles.isEmpty()) {
String profileString = profiles.as(runContext, String.class);
if (profileString != null && !profileString.isEmpty()) {
if (Files.exists(Path.of(".profiles/profiles.yml"))) {
runContext.logger().warn("A 'profiles.yml' file already exist in the task working directory, it will be overridden.");
}

FileUtils.writeStringToFile(
new File(workingDirectory.resolve(".profile").toString(), "profiles.yml"),
runContext.render(profiles),
profileString,
StandardCharsets.UTF_8
);
}
Expand All @@ -207,26 +192,26 @@ public void accept(String line, Boolean isStdErr) {

private String createDbtCommand(RunContext runContext) throws IllegalVariableEvaluationException {
List<String> commands = new ArrayList<>(List.of(
runContext.render(dbtPath),
dbtPath.as(runContext, String.class),
"--log-format json"
));

if (this.debug) {
if (Boolean.TRUE.equals(this.debug.as(runContext, Boolean.class))) {
commands.add("--debug");
}

if (this.failFast) {
if (Boolean.TRUE.equals(this.failFast.as(runContext, Boolean.class))) {
commands.add("--fail-fast");
}

if (this.warnError) {
if (Boolean.TRUE.equals(this.warnError.as(runContext, Boolean.class))) {
commands.add("--warn-error");
}

commands.addAll(dbtCommands(runContext));

if (this.projectDir != null) {
commands.add("--project-dir {{" + ScriptService.VAR_WORKING_DIR + "}}" + runContext.render(this.projectDir));
commands.add("--project-dir {{" + ScriptService.VAR_WORKING_DIR + "}}" + this.projectDir.as(runContext, String.class));
} else {
commands.add("--project-dir {{" + ScriptService.VAR_WORKING_DIR + "}}");
}
Expand All @@ -235,11 +220,11 @@ private String createDbtCommand(RunContext runContext) throws IllegalVariableEva
}

protected void parseResults(RunContext runContext, Path workingDirectory, ScriptOutput scriptOutput) throws IllegalVariableEvaluationException, IOException {
String baseDir = this.projectDir != null ? runContext.render(this.projectDir) : "";
String baseDir = this.projectDir != null ? this.projectDir.as(runContext, String.class) : "";

File runResults = workingDirectory.resolve(baseDir + "target/run_results.json").toFile();

if (this.parseRunResults && runResults.exists()) {
if (this.parseRunResults.as(runContext, Boolean.class) && runResults.exists()) {
URI results = ResultParser.parseRunResult(runContext, runResults);
scriptOutput.getOutputFiles().put("run_results.json", results);
}
Expand Down
32 changes: 13 additions & 19 deletions src/main/java/io/kestra/plugin/dbt/cli/AbstractRun.java
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
package io.kestra.plugin.dbt.cli;

import io.kestra.core.exceptions.IllegalVariableEvaluationException;
import io.kestra.core.models.annotations.PluginProperty;
import io.kestra.core.models.property.Property;
import io.kestra.core.models.tasks.runners.ScriptService;
import io.kestra.core.runners.RunContext;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.*;
import lombok.experimental.SuperBuilder;

import java.nio.file.Path;
import java.util.ArrayList;
import java.util.List;

@SuperBuilder
@ToString
Expand All @@ -20,40 +20,34 @@ public abstract class AbstractRun extends AbstractDbt {
@Schema(
title = "Specify the number of threads to use while executing models."
)
@PluginProperty
Integer thread;
Property<Integer> thread;

@Builder.Default
@Schema(
title = "Whether dbt will drop incremental models and fully-recalculate the incremental table " +
"from the model definition."
)
@PluginProperty
Boolean fullRefresh = false;
Property<Boolean> fullRefresh = Property.of(Boolean.FALSE);

@Schema(
title = "Which target to load for the given profile"
)
@PluginProperty(dynamic = true)
String target;
Property<String> target;

@Schema(
title = "The selector name to use, as defined in selectors.yml"
)
@PluginProperty(dynamic = true)
String selector;
Property<String> selector;

@Schema(
title = "List of nodes to include"
)
@PluginProperty(dynamic = true)
java.util.List<String> select;
Property<List<String>> select;

@Schema(
title = "List of models to exclude"
)
@PluginProperty(dynamic = true)
java.util.List<String> exclude;
Property<List<String>> exclude;

abstract protected String dbtCommand();

Expand All @@ -67,24 +61,24 @@ protected java.util.List<String> dbtCommands(RunContext runContext) throws Illeg
commands.add("--threads " + this.thread);
}

if (this.fullRefresh) {
if (this.fullRefresh.as(runContext, Boolean.class)) {
commands.add("--full-refresh");
}

if (this.target != null) {
commands.add("--target " + runContext.render(this.target));
commands.add("--target " + this.target.as(runContext, String.class));
}

if (this.selector != null) {
commands.add("--selector " + runContext.render(this.selector));
commands.add("--selector " + this.selector.as(runContext, String.class));
}

if (this.select != null) {
commands.add("--select " + String.join(" ", runContext.render(this.select)));
commands.add("--select " + String.join(" ", this.select.asList(runContext, String.class)));
}

if (this.exclude != null) {
commands.add("--exclude " + String.join(" ", runContext.render(this.exclude)));
commands.add("--exclude " + String.join(" ", this.exclude.asList(runContext, String.class)));
}

return commands;
Expand Down
Loading

0 comments on commit df7f374

Please sign in to comment.