Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: remove deprecated methods #163

Merged
merged 1 commit into from
Dec 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 16 additions & 18 deletions src/main/java/io/kestra/plugin/dbt/cli/AbstractDbt.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,8 @@
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.*;
import java.util.List;
import java.util.Map;

@SuperBuilder
@ToString
Expand Down Expand Up @@ -143,13 +141,13 @@ public void setDockerOptions(Property<DockerOptions> dockerOptions) {
@Override
public ScriptOutput run(RunContext runContext) throws Exception {
CommandsWrapper commandsWrapper = new CommandsWrapper(runContext)
.withEnv(this.getEnv() != null ? this.getEnv().asMap(runContext, String.class, String.class) : new HashMap<>())
.withEnv(runContext.render(this.getEnv()).asMap(String.class, String.class))
.withNamespaceFiles(namespaceFiles)
.withInputFiles(inputFiles)
.withOutputFiles(outputFiles)
.withRunnerType(this.getRunner() != null ? this.getRunner().as(runContext, RunnerType.class) : null)
.withDockerOptions(this.getDocker() != null ? this.getDocker().as(runContext, DockerOptions.class) : null)
.withContainerImage(this.containerImage.as(runContext, String.class))
.withRunnerType(runContext.render(this.getRunner()).as(RunnerType.class).orElse(null))
.withDockerOptions(runContext.render(this.getDocker()).as(DockerOptions.class).orElse(null))
.withContainerImage(runContext.render(this.getContainerImage()).as(String.class).orElseThrow())
.withTaskRunner(this.taskRunner)
.withLogConsumer(new AbstractLogConsumer() {
@Override
Expand All @@ -160,15 +158,15 @@ public void accept(String line, Boolean isStdErr) {
.withEnableOutputDirectory(true); //force output files on task runners
Path workingDirectory = commandsWrapper.getWorkingDirectory();

String profileString = profiles != null ? profiles.as(runContext, String.class) : null;
if (profileString != null && !profileString.isEmpty()) {
Optional<String> profileString = runContext.render(profiles).as(String.class);
if (profileString.isPresent() && !profileString.get().isEmpty()) {
if (Files.exists(Path.of(".profiles/profiles.yml"))) {
runContext.logger().warn("A 'profiles.yml' file already exist in the task working directory, it will be overridden.");
}

FileUtils.writeStringToFile(
new File(workingDirectory.resolve(".profile").toString(), "profiles.yml"),
profileString,
profileString.get(),
StandardCharsets.UTF_8
);
}
Expand All @@ -194,26 +192,26 @@ public void accept(String line, Boolean isStdErr) {

private String createDbtCommand(RunContext runContext) throws IllegalVariableEvaluationException {
List<String> commands = new ArrayList<>(List.of(
dbtPath.as(runContext, String.class),
runContext.render(this.dbtPath).as(String.class).orElseThrow(),
"--log-format json"
));

if (Boolean.TRUE.equals(this.debug.as(runContext, Boolean.class))) {
if (Boolean.TRUE.equals(runContext.render(this.debug).as(Boolean.class).orElse(false))) {
commands.add("--debug");
}

if (Boolean.TRUE.equals(this.failFast.as(runContext, Boolean.class))) {
if (Boolean.TRUE.equals(runContext.render(this.failFast).as(Boolean.class).orElse(false))) {
commands.add("--fail-fast");
}

if (Boolean.TRUE.equals(this.warnError.as(runContext, Boolean.class))) {
if (Boolean.TRUE.equals(runContext.render(this.warnError).as(Boolean.class).orElse(false))) {
commands.add("--warn-error");
}

commands.addAll(dbtCommands(runContext));

if (this.projectDir != null) {
commands.add("--project-dir {{" + ScriptService.VAR_WORKING_DIR + "}}" + this.projectDir.as(runContext, String.class));
if (runContext.render(this.projectDir).as(String.class).isPresent()) {
commands.add("--project-dir {{" + ScriptService.VAR_WORKING_DIR + "}}" + runContext.render(this.projectDir).as(String.class).get());
} else {
commands.add("--project-dir {{" + ScriptService.VAR_WORKING_DIR + "}}");
}
Expand All @@ -222,11 +220,11 @@ private String createDbtCommand(RunContext runContext) throws IllegalVariableEva
}

protected void parseResults(RunContext runContext, Path workingDirectory, ScriptOutput scriptOutput) throws IllegalVariableEvaluationException, IOException {
String baseDir = this.projectDir != null ? this.projectDir.as(runContext, String.class) : "";
String baseDir = runContext.render(this.projectDir).as(String.class).orElse("");

File runResults = workingDirectory.resolve(baseDir + "target/run_results.json").toFile();

if (this.parseRunResults.as(runContext, Boolean.class) && runResults.exists()) {
if (runContext.render(this.parseRunResults).as(Boolean.class).orElse(true) && runResults.exists()) {
URI results = ResultParser.parseRunResult(runContext, runResults);
scriptOutput.getOutputFiles().put("run_results.json", results);
}
Expand Down
22 changes: 11 additions & 11 deletions src/main/java/io/kestra/plugin/dbt/cli/AbstractRun.java
Original file line number Diff line number Diff line change
Expand Up @@ -57,28 +57,28 @@ protected java.util.List<String> dbtCommands(RunContext runContext) throws Illeg
this.dbtCommand(),
"--profiles-dir {{" + ScriptService.VAR_WORKING_DIR + "}}/.profile"));

if (this.thread != null) {
commands.add("--threads " + this.thread);
if (runContext.render(this.thread).as(Integer.class).isPresent()) {
commands.add("--threads " + runContext.render(this.thread).as(Integer.class).get());
}

if (this.fullRefresh.as(runContext, Boolean.class)) {
if (runContext.render(this.fullRefresh).as(Boolean.class).orElse(false)) {
commands.add("--full-refresh");
}

if (this.target != null) {
commands.add("--target " + this.target.as(runContext, String.class));
if (runContext.render(this.target).as(String.class).isPresent()) {
commands.add("--target " + runContext.render(this.target).as(String.class).get());
}

if (this.selector != null) {
commands.add("--selector " + this.selector.as(runContext, String.class));
if (runContext.render(this.selector).as(String.class).isPresent()) {
commands.add("--selector " + runContext.render(this.target).as(String.class).get());
}

if (this.select != null) {
commands.add("--select " + String.join(" ", this.select.asList(runContext, String.class)));
if (!runContext.render(this.select).asList(String.class).isEmpty()) {
commands.add("--select " + String.join(" ", runContext.render(this.select).asList(String.class)));
}

if (this.exclude != null) {
commands.add("--exclude " + String.join(" ", this.exclude.asList(runContext, String.class)));
if (!runContext.render(this.exclude).asList(String.class).isEmpty()) {
commands.add("--exclude " + String.join(" ", runContext.render(this.exclude).asList(String.class)));
}

return commands;
Expand Down
23 changes: 12 additions & 11 deletions src/main/java/io/kestra/plugin/dbt/cli/DbtCLI.java
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@
taskRunner:
type: io.kestra.plugin.scripts.runner.docker.Docker
memory:
memory: 1GB
memory: 1GB
containerImage: python:3.11-slim
beforeCommands:
- pip install uv
Expand All @@ -173,7 +173,7 @@
code = """
id: dwh_and_analytics
namespace: company.team

tasks:
- id: dbt
type: io.kestra.plugin.core.flow.WorkingDirectory
Expand All @@ -182,7 +182,7 @@
type: io.kestra.plugin.git.Clone
url: https://github.com/kestra-io/dbt-example
branch: master

- id: dbt_build
type: io.kestra.plugin.dbt.cli.DbtCLI
taskRunner:
Expand Down Expand Up @@ -236,7 +236,7 @@
type: io.kestra.plugin.git.Clone
url: https://github.com/kestra-io/dbt-example
branch: master

- id: dbt_build
type: io.kestra.plugin.dbt.cli.DbtCLI
taskRunner:
Expand Down Expand Up @@ -360,7 +360,7 @@ public ScriptOutput run(RunContext runContext) throws Exception {

//Check/fail if a KV store exists with given namespace
if(this.getStoreManifest() != null) {
storeManifestKvStore = runContext.namespaceKv(this.getStoreManifest().getNamespace().as(runContext, String.class));
storeManifestKvStore = runContext.namespaceKv(runContext.render(this.getStoreManifest().getNamespace()).as(String.class).orElseThrow());
}

CommandsWrapper commands = this.commands(runContext)
Expand All @@ -372,16 +372,17 @@ public void accept(String line, Boolean isStdErr) {
}
});

Path projectWorkingDirectory = projectDir == null ? commands.getWorkingDirectory() : commands.getWorkingDirectory().resolve(projectDir.as(runContext, String.class));
var renderedProjectDir = runContext.render(projectDir).as(String.class);
Path projectWorkingDirectory = renderedProjectDir.map(s -> commands.getWorkingDirectory().resolve(s)).orElseGet(commands::getWorkingDirectory);

//Load manifest from KV store
if(this.getLoadManifest() != null) {
KVStore loadManifestKvStore = runContext.namespaceKv(this.getLoadManifest().getNamespace().as(runContext, String.class));
KVStore loadManifestKvStore = runContext.namespaceKv(runContext.render(this.getLoadManifest().getNamespace()).as(String.class).orElseThrow());
fetchAndStoreManifestIfExists(runContext, loadManifestKvStore, projectWorkingDirectory);
}

//Create profiles.yml
String profilesString = profiles == null ? null : profiles.as(runContext, String.class);
String profilesString = runContext.render(profiles).as(String.class).orElse(null);
if (profilesString != null && !profilesString.isEmpty()) {
var profileFile = new File(commands.getWorkingDirectory().toString(), "profiles.yml");
if (profileFile.exists()) {
Expand Down Expand Up @@ -416,15 +417,15 @@ public void accept(String line, Boolean isStdErr) {
.run();

//Parse run results
if (this.parseRunResults.as(runContext, Boolean.class) && projectWorkingDirectory.resolve("target/run_results.json").toFile().exists()) {
if (runContext.render(this.parseRunResults).as(Boolean.class).orElse(Boolean.TRUE) && projectWorkingDirectory.resolve("target/run_results.json").toFile().exists()) {
URI results = ResultParser.parseRunResult(runContext, projectWorkingDirectory.resolve("target/run_results.json").toFile());
run.getOutputFiles().put("run_results.json", results);
}

File manifestFile = projectWorkingDirectory.resolve("target/manifest.json").toFile();
if (manifestFile.exists()) {
if(this.getStoreManifest() != null) {
final String key = this.getStoreManifest().getKey().as(runContext, String.class);
final String key = runContext.render(this.getStoreManifest().getKey()).as(String.class).orElseThrow();
storeManifestKvStore.put(key, new KVValueAndMetadata(null, JacksonMapper.toObject(Files.readString(manifestFile.toPath()))));
}

Expand All @@ -437,7 +438,7 @@ public void accept(String line, Boolean isStdErr) {
}

private void fetchAndStoreManifestIfExists(RunContext runContext, KVStore loadManifestKvStore, Path projectWorkingDirectory) throws IOException, ResourceExpiredException, IllegalVariableEvaluationException {
Optional<KVValue> manifestValue = loadManifestKvStore.getValue(this.getLoadManifest().getKey().as(runContext, String.class));
Optional<KVValue> manifestValue = loadManifestKvStore.getValue(runContext.render(this.getLoadManifest().getKey()).as(String.class).get());

if(manifestValue.isEmpty() || manifestValue.get().value() == null || StringUtils.isBlank(manifestValue.get().value().toString())) {
runContext.logger().warn("Property `loadManifest` has been used but no manifest has been found in the KV Store.");
Expand Down
12 changes: 7 additions & 5 deletions src/main/java/io/kestra/plugin/dbt/cli/Setup.java
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@
type: io.kestra.plugin.git.Clone
url: https://github.com/kestra-io/dbt-demo
branch: main

- id: dbt_setup
type: io.kestra.plugin.dbt.cli.Setup
requirements:
Expand All @@ -83,7 +83,7 @@
extensions:
- parquet
target: dev

- id: dbt_build
type: io.kestra.plugin.dbt.cli.Build
"""
Expand Down Expand Up @@ -174,14 +174,14 @@ public ScriptOutput run(RunContext runContext) throws Exception {
CommandsWrapper commandsWrapper = this.commands(runContext);
Path workingDirectory = commandsWrapper.getWorkingDirectory();

List<String> commands = this.virtualEnvCommand(runContext, workingDirectory, this.requirements.asList(runContext, String.class));
List<String> commands = this.virtualEnvCommand(runContext, workingDirectory, runContext.render(this.requirements).asList(String.class));

// write profile
File profileDir = workingDirectory.resolve(".profile").toFile();
// noinspection ResultOfMethodCallIgnored
profileDir.mkdirs();

String profilesContent = profilesContent(runContext, profiles.as(runContext, Object.class));
String profilesContent = profilesContent(runContext, runContext.render(this.profiles).as(Object.class).orElseThrow());
FileUtils.writeStringToFile(
new File(profileDir, "profiles.yml"),
profilesContent,
Expand Down Expand Up @@ -235,6 +235,8 @@ private String profilesContent(RunContext runContext, Object profiles) throws Il
}

private Map<String, String> finalInputFiles(RunContext runContext) throws IOException, IllegalVariableEvaluationException {
return this.inputFiles != null ? new HashMap<>(PluginUtilsService.transformInputFiles(runContext, this.inputFiles.as(runContext, Object.class))) : new HashMap<>();
return runContext.render(this.inputFiles).as(Object.class).isPresent() ?
new HashMap<>(PluginUtilsService.transformInputFiles(runContext, runContext.render(this.inputFiles).as(Object.class).orElseThrow())) :
new HashMap<>();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ protected HttpClient client(RunContext runContext) throws IllegalVariableEvaluat
httpConfig.setMaxContentLength(Integer.MAX_VALUE);
httpConfig.setReadTimeout(HTTP_READ_TIMEOUT);

DefaultHttpClient client = (DefaultHttpClient) FACTORY.createClient(URI.create(baseUrl.as(runContext, String.class)).toURL(), httpConfig);
DefaultHttpClient client = (DefaultHttpClient) FACTORY.createClient(URI.create(runContext.render(baseUrl).as(String.class).orElseThrow()).toURL(), httpConfig);
client.setMediaTypeCodecRegistry(mediaTypeCodecRegistry);

return client;
Expand All @@ -78,7 +78,7 @@ protected <REQ, RES> HttpResponse<RES> request(RunContext runContext,
Duration timeout) throws HttpClientResponseException {
try {
request = request
.bearerAuth(this.token.as(runContext, String.class))
.bearerAuth(runContext.render(this.token).as(String.class).orElseThrow())
.contentType(MediaType.APPLICATION_JSON);

try (HttpClient client = this.client(runContext)) {
Expand Down
14 changes: 7 additions & 7 deletions src/main/java/io/kestra/plugin/dbt/cloud/CheckStatus.java
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ public CheckStatus.Output run(RunContext runContext) throws Exception {
Logger logger = runContext.logger();

// Check rendered runId provided is an Integer
Long runIdRendered = Long.parseLong(this.runId.as(runContext, String.class));
Long runIdRendered = Long.parseLong(runContext.render(this.runId).as(String.class).orElseThrow());

// wait for end
RunResponse finalRunResponse = Await.until(
Expand Down Expand Up @@ -139,8 +139,8 @@ public CheckStatus.Output run(RunContext runContext) throws Exception {

return null;
}),
this.pollFrequency.as(runContext, Duration.class),
this.maxDuration.as(runContext, Duration.class)
runContext.render(this.pollFrequency).as(Duration.class).orElseThrow(),
runContext.render(this.maxDuration).as(Duration.class).orElseThrow()
);

// final response
Expand All @@ -155,7 +155,7 @@ public CheckStatus.Output run(RunContext runContext) throws Exception {
Path runResultsArtifact = downloadArtifacts(runContext, runIdRendered, "run_results.json");
Path manifestArtifact = downloadArtifacts(runContext, runIdRendered, "manifest.json");

if (this.parseRunResults.as(runContext, Boolean.class)) {
if (runContext.render(this.parseRunResults).as(Boolean.class).orElseThrow()) {
ResultParser.parseRunResult(runContext, runResultsArtifact.toFile());
}

Expand Down Expand Up @@ -207,12 +207,12 @@ private Optional<RunResponse> fetchRunResponse(RunContext runContext, Long id, B
)
)
.expand(Map.of(
"accountId", this.accountId.as(runContext, String.class),
"accountId", runContext.render(this.accountId).as(String.class).orElseThrow(),
"runId", id
))
),
Argument.of(RunResponse.class),
maxDuration.as(runContext, Duration.class)
runContext.render(this.maxDuration).as(Duration.class).orElseThrow()
)
.getBody();
}
Expand All @@ -227,7 +227,7 @@ private Path downloadArtifacts(RunContext runContext, Long runId, String path) t
UriTemplate
.of("/api/v2/accounts/{accountId}/runs/{runId}/artifacts/{path}")
.expand(Map.of(
"accountId", this.accountId.as(runContext, String.class),
"accountId", runContext.render(this.accountId).as(String.class).orElseThrow(),
"runId", runId,
"path", path
))
Expand Down
Loading
Loading