Skip to content

Commit

Permalink
refactor: remove deprecated methods (#163)
Browse files Browse the repository at this point in the history
dynamic props , remove deprecated methos
  • Loading branch information
mgabelle authored Dec 4, 2024
1 parent 85365b8 commit e01f676
Show file tree
Hide file tree
Showing 7 changed files with 71 additions and 97 deletions.
34 changes: 16 additions & 18 deletions src/main/java/io/kestra/plugin/dbt/cli/AbstractDbt.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,8 @@
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.*;
import java.util.List;
import java.util.Map;

@SuperBuilder
@ToString
Expand Down Expand Up @@ -143,13 +141,13 @@ public void setDockerOptions(Property<DockerOptions> dockerOptions) {
@Override
public ScriptOutput run(RunContext runContext) throws Exception {
CommandsWrapper commandsWrapper = new CommandsWrapper(runContext)
.withEnv(this.getEnv() != null ? this.getEnv().asMap(runContext, String.class, String.class) : new HashMap<>())
.withEnv(runContext.render(this.getEnv()).asMap(String.class, String.class))
.withNamespaceFiles(namespaceFiles)
.withInputFiles(inputFiles)
.withOutputFiles(outputFiles)
.withRunnerType(this.getRunner() != null ? this.getRunner().as(runContext, RunnerType.class) : null)
.withDockerOptions(this.getDocker() != null ? this.getDocker().as(runContext, DockerOptions.class) : null)
.withContainerImage(this.containerImage.as(runContext, String.class))
.withRunnerType(runContext.render(this.getRunner()).as(RunnerType.class).orElse(null))
.withDockerOptions(runContext.render(this.getDocker()).as(DockerOptions.class).orElse(null))
.withContainerImage(runContext.render(this.getContainerImage()).as(String.class).orElseThrow())
.withTaskRunner(this.taskRunner)
.withLogConsumer(new AbstractLogConsumer() {
@Override
Expand All @@ -160,15 +158,15 @@ public void accept(String line, Boolean isStdErr) {
.withEnableOutputDirectory(true); //force output files on task runners
Path workingDirectory = commandsWrapper.getWorkingDirectory();

String profileString = profiles != null ? profiles.as(runContext, String.class) : null;
if (profileString != null && !profileString.isEmpty()) {
Optional<String> profileString = runContext.render(profiles).as(String.class);
if (profileString.isPresent() && !profileString.get().isEmpty()) {
if (Files.exists(Path.of(".profiles/profiles.yml"))) {
runContext.logger().warn("A 'profiles.yml' file already exist in the task working directory, it will be overridden.");
}

FileUtils.writeStringToFile(
new File(workingDirectory.resolve(".profile").toString(), "profiles.yml"),
profileString,
profileString.get(),
StandardCharsets.UTF_8
);
}
Expand All @@ -194,26 +192,26 @@ public void accept(String line, Boolean isStdErr) {

private String createDbtCommand(RunContext runContext) throws IllegalVariableEvaluationException {
List<String> commands = new ArrayList<>(List.of(
dbtPath.as(runContext, String.class),
runContext.render(this.dbtPath).as(String.class).orElseThrow(),
"--log-format json"
));

if (Boolean.TRUE.equals(this.debug.as(runContext, Boolean.class))) {
if (Boolean.TRUE.equals(runContext.render(this.debug).as(Boolean.class).orElse(false))) {
commands.add("--debug");
}

if (Boolean.TRUE.equals(this.failFast.as(runContext, Boolean.class))) {
if (Boolean.TRUE.equals(runContext.render(this.failFast).as(Boolean.class).orElse(false))) {
commands.add("--fail-fast");
}

if (Boolean.TRUE.equals(this.warnError.as(runContext, Boolean.class))) {
if (Boolean.TRUE.equals(runContext.render(this.warnError).as(Boolean.class).orElse(false))) {
commands.add("--warn-error");
}

commands.addAll(dbtCommands(runContext));

if (this.projectDir != null) {
commands.add("--project-dir {{" + ScriptService.VAR_WORKING_DIR + "}}" + this.projectDir.as(runContext, String.class));
if (runContext.render(this.projectDir).as(String.class).isPresent()) {
commands.add("--project-dir {{" + ScriptService.VAR_WORKING_DIR + "}}" + runContext.render(this.projectDir).as(String.class).get());
} else {
commands.add("--project-dir {{" + ScriptService.VAR_WORKING_DIR + "}}");
}
Expand All @@ -222,11 +220,11 @@ private String createDbtCommand(RunContext runContext) throws IllegalVariableEva
}

protected void parseResults(RunContext runContext, Path workingDirectory, ScriptOutput scriptOutput) throws IllegalVariableEvaluationException, IOException {
String baseDir = this.projectDir != null ? this.projectDir.as(runContext, String.class) : "";
String baseDir = runContext.render(this.projectDir).as(String.class).orElse("");

File runResults = workingDirectory.resolve(baseDir + "target/run_results.json").toFile();

if (this.parseRunResults.as(runContext, Boolean.class) && runResults.exists()) {
if (runContext.render(this.parseRunResults).as(Boolean.class).orElse(true) && runResults.exists()) {
URI results = ResultParser.parseRunResult(runContext, runResults);
scriptOutput.getOutputFiles().put("run_results.json", results);
}
Expand Down
22 changes: 11 additions & 11 deletions src/main/java/io/kestra/plugin/dbt/cli/AbstractRun.java
Original file line number Diff line number Diff line change
Expand Up @@ -57,28 +57,28 @@ protected java.util.List<String> dbtCommands(RunContext runContext) throws Illeg
this.dbtCommand(),
"--profiles-dir {{" + ScriptService.VAR_WORKING_DIR + "}}/.profile"));

if (this.thread != null) {
commands.add("--threads " + this.thread);
if (runContext.render(this.thread).as(Integer.class).isPresent()) {
commands.add("--threads " + runContext.render(this.thread).as(Integer.class).get());
}

if (this.fullRefresh.as(runContext, Boolean.class)) {
if (runContext.render(this.fullRefresh).as(Boolean.class).orElse(false)) {
commands.add("--full-refresh");
}

if (this.target != null) {
commands.add("--target " + this.target.as(runContext, String.class));
if (runContext.render(this.target).as(String.class).isPresent()) {
commands.add("--target " + runContext.render(this.target).as(String.class).get());
}

if (this.selector != null) {
commands.add("--selector " + this.selector.as(runContext, String.class));
if (runContext.render(this.selector).as(String.class).isPresent()) {
commands.add("--selector " + runContext.render(this.target).as(String.class).get());
}

if (this.select != null) {
commands.add("--select " + String.join(" ", this.select.asList(runContext, String.class)));
if (!runContext.render(this.select).asList(String.class).isEmpty()) {
commands.add("--select " + String.join(" ", runContext.render(this.select).asList(String.class)));
}

if (this.exclude != null) {
commands.add("--exclude " + String.join(" ", this.exclude.asList(runContext, String.class)));
if (!runContext.render(this.exclude).asList(String.class).isEmpty()) {
commands.add("--exclude " + String.join(" ", runContext.render(this.exclude).asList(String.class)));
}

return commands;
Expand Down
23 changes: 12 additions & 11 deletions src/main/java/io/kestra/plugin/dbt/cli/DbtCLI.java
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@
taskRunner:
type: io.kestra.plugin.scripts.runner.docker.Docker
memory:
memory: 1GB
memory: 1GB
containerImage: python:3.11-slim
beforeCommands:
- pip install uv
Expand All @@ -173,7 +173,7 @@
code = """
id: dwh_and_analytics
namespace: company.team
tasks:
- id: dbt
type: io.kestra.plugin.core.flow.WorkingDirectory
Expand All @@ -182,7 +182,7 @@
type: io.kestra.plugin.git.Clone
url: https://github.com/kestra-io/dbt-example
branch: master
- id: dbt_build
type: io.kestra.plugin.dbt.cli.DbtCLI
taskRunner:
Expand Down Expand Up @@ -236,7 +236,7 @@
type: io.kestra.plugin.git.Clone
url: https://github.com/kestra-io/dbt-example
branch: master
- id: dbt_build
type: io.kestra.plugin.dbt.cli.DbtCLI
taskRunner:
Expand Down Expand Up @@ -360,7 +360,7 @@ public ScriptOutput run(RunContext runContext) throws Exception {

//Check/fail if a KV store exists with given namespace
if(this.getStoreManifest() != null) {
storeManifestKvStore = runContext.namespaceKv(this.getStoreManifest().getNamespace().as(runContext, String.class));
storeManifestKvStore = runContext.namespaceKv(runContext.render(this.getStoreManifest().getNamespace()).as(String.class).orElseThrow());
}

CommandsWrapper commands = this.commands(runContext)
Expand All @@ -372,16 +372,17 @@ public void accept(String line, Boolean isStdErr) {
}
});

Path projectWorkingDirectory = projectDir == null ? commands.getWorkingDirectory() : commands.getWorkingDirectory().resolve(projectDir.as(runContext, String.class));
var renderedProjectDir = runContext.render(projectDir).as(String.class);
Path projectWorkingDirectory = renderedProjectDir.map(s -> commands.getWorkingDirectory().resolve(s)).orElseGet(commands::getWorkingDirectory);

//Load manifest from KV store
if(this.getLoadManifest() != null) {
KVStore loadManifestKvStore = runContext.namespaceKv(this.getLoadManifest().getNamespace().as(runContext, String.class));
KVStore loadManifestKvStore = runContext.namespaceKv(runContext.render(this.getLoadManifest().getNamespace()).as(String.class).orElseThrow());
fetchAndStoreManifestIfExists(runContext, loadManifestKvStore, projectWorkingDirectory);
}

//Create profiles.yml
String profilesString = profiles == null ? null : profiles.as(runContext, String.class);
String profilesString = runContext.render(profiles).as(String.class).orElse(null);
if (profilesString != null && !profilesString.isEmpty()) {
var profileFile = new File(commands.getWorkingDirectory().toString(), "profiles.yml");
if (profileFile.exists()) {
Expand Down Expand Up @@ -416,15 +417,15 @@ public void accept(String line, Boolean isStdErr) {
.run();

//Parse run results
if (this.parseRunResults.as(runContext, Boolean.class) && projectWorkingDirectory.resolve("target/run_results.json").toFile().exists()) {
if (runContext.render(this.parseRunResults).as(Boolean.class).orElse(Boolean.TRUE) && projectWorkingDirectory.resolve("target/run_results.json").toFile().exists()) {
URI results = ResultParser.parseRunResult(runContext, projectWorkingDirectory.resolve("target/run_results.json").toFile());
run.getOutputFiles().put("run_results.json", results);
}

File manifestFile = projectWorkingDirectory.resolve("target/manifest.json").toFile();
if (manifestFile.exists()) {
if(this.getStoreManifest() != null) {
final String key = this.getStoreManifest().getKey().as(runContext, String.class);
final String key = runContext.render(this.getStoreManifest().getKey()).as(String.class).orElseThrow();
storeManifestKvStore.put(key, new KVValueAndMetadata(null, JacksonMapper.toObject(Files.readString(manifestFile.toPath()))));
}

Expand All @@ -437,7 +438,7 @@ public void accept(String line, Boolean isStdErr) {
}

private void fetchAndStoreManifestIfExists(RunContext runContext, KVStore loadManifestKvStore, Path projectWorkingDirectory) throws IOException, ResourceExpiredException, IllegalVariableEvaluationException {
Optional<KVValue> manifestValue = loadManifestKvStore.getValue(this.getLoadManifest().getKey().as(runContext, String.class));
Optional<KVValue> manifestValue = loadManifestKvStore.getValue(runContext.render(this.getLoadManifest().getKey()).as(String.class).get());

if(manifestValue.isEmpty() || manifestValue.get().value() == null || StringUtils.isBlank(manifestValue.get().value().toString())) {
runContext.logger().warn("Property `loadManifest` has been used but no manifest has been found in the KV Store.");
Expand Down
12 changes: 7 additions & 5 deletions src/main/java/io/kestra/plugin/dbt/cli/Setup.java
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@
type: io.kestra.plugin.git.Clone
url: https://github.com/kestra-io/dbt-demo
branch: main
- id: dbt_setup
type: io.kestra.plugin.dbt.cli.Setup
requirements:
Expand All @@ -83,7 +83,7 @@
extensions:
- parquet
target: dev
- id: dbt_build
type: io.kestra.plugin.dbt.cli.Build
"""
Expand Down Expand Up @@ -174,14 +174,14 @@ public ScriptOutput run(RunContext runContext) throws Exception {
CommandsWrapper commandsWrapper = this.commands(runContext);
Path workingDirectory = commandsWrapper.getWorkingDirectory();

List<String> commands = this.virtualEnvCommand(runContext, workingDirectory, this.requirements.asList(runContext, String.class));
List<String> commands = this.virtualEnvCommand(runContext, workingDirectory, runContext.render(this.requirements).asList(String.class));

// write profile
File profileDir = workingDirectory.resolve(".profile").toFile();
// noinspection ResultOfMethodCallIgnored
profileDir.mkdirs();

String profilesContent = profilesContent(runContext, profiles.as(runContext, Object.class));
String profilesContent = profilesContent(runContext, runContext.render(this.profiles).as(Object.class).orElseThrow());
FileUtils.writeStringToFile(
new File(profileDir, "profiles.yml"),
profilesContent,
Expand Down Expand Up @@ -235,6 +235,8 @@ private String profilesContent(RunContext runContext, Object profiles) throws Il
}

private Map<String, String> finalInputFiles(RunContext runContext) throws IOException, IllegalVariableEvaluationException {
return this.inputFiles != null ? new HashMap<>(PluginUtilsService.transformInputFiles(runContext, this.inputFiles.as(runContext, Object.class))) : new HashMap<>();
return runContext.render(this.inputFiles).as(Object.class).isPresent() ?
new HashMap<>(PluginUtilsService.transformInputFiles(runContext, runContext.render(this.inputFiles).as(Object.class).orElseThrow())) :
new HashMap<>();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ protected HttpClient client(RunContext runContext) throws IllegalVariableEvaluat
httpConfig.setMaxContentLength(Integer.MAX_VALUE);
httpConfig.setReadTimeout(HTTP_READ_TIMEOUT);

DefaultHttpClient client = (DefaultHttpClient) FACTORY.createClient(URI.create(baseUrl.as(runContext, String.class)).toURL(), httpConfig);
DefaultHttpClient client = (DefaultHttpClient) FACTORY.createClient(URI.create(runContext.render(baseUrl).as(String.class).orElseThrow()).toURL(), httpConfig);
client.setMediaTypeCodecRegistry(mediaTypeCodecRegistry);

return client;
Expand All @@ -78,7 +78,7 @@ protected <REQ, RES> HttpResponse<RES> request(RunContext runContext,
Duration timeout) throws HttpClientResponseException {
try {
request = request
.bearerAuth(this.token.as(runContext, String.class))
.bearerAuth(runContext.render(this.token).as(String.class).orElseThrow())
.contentType(MediaType.APPLICATION_JSON);

try (HttpClient client = this.client(runContext)) {
Expand Down
14 changes: 7 additions & 7 deletions src/main/java/io/kestra/plugin/dbt/cloud/CheckStatus.java
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ public CheckStatus.Output run(RunContext runContext) throws Exception {
Logger logger = runContext.logger();

// Check rendered runId provided is an Integer
Long runIdRendered = Long.parseLong(this.runId.as(runContext, String.class));
Long runIdRendered = Long.parseLong(runContext.render(this.runId).as(String.class).orElseThrow());

// wait for end
RunResponse finalRunResponse = Await.until(
Expand Down Expand Up @@ -139,8 +139,8 @@ public CheckStatus.Output run(RunContext runContext) throws Exception {

return null;
}),
this.pollFrequency.as(runContext, Duration.class),
this.maxDuration.as(runContext, Duration.class)
runContext.render(this.pollFrequency).as(Duration.class).orElseThrow(),
runContext.render(this.maxDuration).as(Duration.class).orElseThrow()
);

// final response
Expand All @@ -155,7 +155,7 @@ public CheckStatus.Output run(RunContext runContext) throws Exception {
Path runResultsArtifact = downloadArtifacts(runContext, runIdRendered, "run_results.json");
Path manifestArtifact = downloadArtifacts(runContext, runIdRendered, "manifest.json");

if (this.parseRunResults.as(runContext, Boolean.class)) {
if (runContext.render(this.parseRunResults).as(Boolean.class).orElseThrow()) {
ResultParser.parseRunResult(runContext, runResultsArtifact.toFile());
}

Expand Down Expand Up @@ -207,12 +207,12 @@ private Optional<RunResponse> fetchRunResponse(RunContext runContext, Long id, B
)
)
.expand(Map.of(
"accountId", this.accountId.as(runContext, String.class),
"accountId", runContext.render(this.accountId).as(String.class).orElseThrow(),
"runId", id
))
),
Argument.of(RunResponse.class),
maxDuration.as(runContext, Duration.class)
runContext.render(this.maxDuration).as(Duration.class).orElseThrow()
)
.getBody();
}
Expand All @@ -227,7 +227,7 @@ private Path downloadArtifacts(RunContext runContext, Long runId, String path) t
UriTemplate
.of("/api/v2/accounts/{accountId}/runs/{runId}/artifacts/{path}")
.expand(Map.of(
"accountId", this.accountId.as(runContext, String.class),
"accountId", runContext.render(this.accountId).as(String.class).orElseThrow(),
"runId", runId,
"path", path
))
Expand Down
Loading

0 comments on commit e01f676

Please sign in to comment.