Skip to content

Commit

Permalink
feat: add storeManifest and loadManifest (#152)
Browse files Browse the repository at this point in the history
* feat: add storeManifest and loadManifest

* doc: update full example flow with storeManifest

add doc

bump andrcuns/allure-publish-action from 2.7.1 to 2.8.0 #153

* feat: implement loadManifest property

* fix: build test

* doc: update flow with loadManifest example

close #45
  • Loading branch information
mgabelle authored Oct 22, 2024
1 parent b9b9c70 commit 4d0b183
Show file tree
Hide file tree
Showing 5 changed files with 182 additions and 42 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ jobs:
- uses: rlespinasse/github-slug-action@v4

- name: Publish allure report
uses: andrcuns/allure-publish-action@v2.7.1
uses: andrcuns/allure-publish-action@v2.8.0
if: ${{ always() && env.GOOGLE_SERVICE_ACCOUNT != 0 && (github.event.inputs.skip-test == 'false' || github.event.inputs.skip-test == '') }}
env:
GITHUB_AUTH_TOKEN: ${{ secrets.GITHUB_TOKEN }}
Expand Down
74 changes: 67 additions & 7 deletions src/main/java/io/kestra/plugin/dbt/cli/DbtCLI.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
import io.kestra.core.models.tasks.runners.TaskRunner;
import io.kestra.core.runners.RunContext;
import io.kestra.core.serializers.JacksonMapper;
import io.kestra.core.storages.kv.KVStore;
import io.kestra.core.storages.kv.KVValueAndMetadata;
import io.kestra.plugin.dbt.ResultParser;
import io.kestra.plugin.scripts.exec.AbstractExecScript;
import io.kestra.plugin.scripts.exec.scripts.models.DockerOptions;
Expand All @@ -18,17 +20,14 @@
import io.kestra.plugin.scripts.runner.docker.Docker;
import io.swagger.v3.oas.annotations.media.Schema;
import jakarta.validation.Valid;
import lombok.Builder;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.ToString;
import lombok.*;
import lombok.experimental.SuperBuilder;
import org.apache.commons.io.FileUtils;

import java.io.File;
import java.net.URI;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.List;
Expand Down Expand Up @@ -149,6 +148,12 @@
- dbt deps --project-dir dbt --target prod
- dbt build --project-dir dbt --target prod
projectDir: dbt
loadManifest:
key: manifest.json
namespace: company.team
storeManifest:
key: manifest.json
namespace: company.team
profiles: |
my_dbt_project:
outputs:
Expand Down Expand Up @@ -220,6 +225,18 @@ public class DbtCLI extends AbstractExecScript {
@Builder.Default
protected String containerImage = DEFAULT_IMAGE;

@Schema(
title = "Store manifest.",
description = "Use this field to persist your manifest.json in the KV Store."
)
protected KvStoreManifest storeManifest;

@Schema(
title = "Load manifest.",
description = "Use this field to retrieve an existing manifest.json in the KV Store and put it in the inputFiles."
)
protected KvStoreManifest loadManifest;

@Override
protected DockerOptions injectDefaults(DockerOptions original) {
if (original == null) {
Expand All @@ -239,6 +256,13 @@ protected DockerOptions injectDefaults(DockerOptions original) {

@Override
public ScriptOutput run(RunContext runContext) throws Exception {
KVStore storeManifestKvStore = null;

//Check/fail if a KV store exists with given namespace
if(this.getStoreManifest() != null) {
storeManifestKvStore = runContext.namespaceKv(this.getStoreManifest().getNamespace().as(runContext, String.class));
}

CommandsWrapper commands = this.commands(runContext)
.withEnableOutputDirectory(true) // force the output dir, so we can get the run_results.json and manifest.json files on each task runners
.withLogConsumer(new AbstractLogConsumer() {
Expand All @@ -250,6 +274,21 @@ public void accept(String line, Boolean isStdErr) {

Path projectWorkingDirectory = projectDir == null ? commands.getWorkingDirectory() : commands.getWorkingDirectory().resolve(projectDir.as(runContext, String.class));

//Load manifest from KV store
if(this.getLoadManifest() != null) {
KVStore loadManifestKvStore = runContext.namespaceKv(this.getLoadManifest().getNamespace().as(runContext, String.class));
var manifestFile = new File(projectWorkingDirectory.toString(), "target/manifest.json");
Object manifestValue = loadManifestKvStore.getValue(this.getLoadManifest().getKey().as(runContext, String.class)).get().value();

FileUtils.writeStringToFile(
manifestFile,
JacksonMapper.ofJson()
.writeValueAsString(manifestValue),
StandardCharsets.UTF_8
);
}

//Create profiles.yml
String profilesString = profiles == null ? null : profiles.as(runContext, String.class);
if (profilesString != null && !profilesString.isEmpty()) {
var profileFile = new File(commands.getWorkingDirectory().toString(), "profiles.yml");
Expand All @@ -264,6 +303,7 @@ public void accept(String line, Boolean isStdErr) {
);
}

//Create and run commands
List<String> commandsArgs = ScriptService.scriptCommands(
this.interpreter,
this.getBeforeCommandsWithOptions(),
Expand All @@ -283,16 +323,36 @@ public void accept(String line, Boolean isStdErr) {
.withCommands(commandsArgs)
.run();

//Parse run results
if (this.parseRunResults.as(runContext, Boolean.class) && projectWorkingDirectory.resolve("target/run_results.json").toFile().exists()) {
URI results = ResultParser.parseRunResult(runContext, projectWorkingDirectory.resolve("target/run_results.json").toFile());
run.getOutputFiles().put("run_results.json", results);
}

if (projectWorkingDirectory.resolve("target/manifest.json").toFile().exists()) {
URI manifest = ResultParser.parseManifest(runContext, projectWorkingDirectory.resolve("target/manifest.json").toFile());
File manifestFile = projectWorkingDirectory.resolve("target/manifest.json").toFile();
if (manifestFile.exists()) {
if(this.storeManifest != null) {
final String key = this.getStoreManifest().getKey().as(runContext, String.class);
storeManifestKvStore.put(key, new KVValueAndMetadata(null, JacksonMapper.toObject(Files.readString(manifestFile.toPath()))));
}

URI manifest = ResultParser.parseManifest(runContext, manifestFile);
run.getOutputFiles().put("manifest.json", manifest);

}

return run;
}

@Builder
@Getter
public static class KvStoreManifest {
@NotNull
@Schema(title = "Key", description = "KV store key containing the manifest.json")
Property<String> key;

@NotNull
@Schema(title = "Namespace", description = "KV store namespace containing the manifest.json")
Property<String> namespace;
}
}
33 changes: 17 additions & 16 deletions src/test/java/io/kestra/plugin/dbt/cli/BuildTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,22 @@

@KestraTest
class BuildTest {
private static final String PROFILES = """
unit-kestra:
outputs:
dev:
dataset: kestra_unit_test_us
fixed_retries: 1
location: US
method: service-account
priority: interactive
project: kestra-unit-test
threads: 1
timeout_seconds: 300
type: bigquery
keyfile: sa.json
target: dev
""";
@Inject
private RunContextFactory runContextFactory;

Expand All @@ -47,22 +63,7 @@ void run() throws Exception {
.id(IdUtils.create())
.type(Setup.class.getName())
.taskRunner(Process.instance())
.profiles(Property.of(Map.of(
"unit-kestra", Map.of(
"outputs", Map.of(
"dev", Map.of("dataset", "kestra_unit_test_us",
"fixed_retries", "1",
"location", "US",
"method", "oauth",
"priority", "interactive",
"project", "kestra-unit-test",
"threads", "1",
"timeout_seconds", "300",
"type", "bigquery"
)
),
"target", "dev"
))))
.profiles(Property.of(PROFILES))
.requirements(Property.of(List.of("dbt-bigquery")))
.build();

Expand Down
112 changes: 96 additions & 16 deletions src/test/java/io/kestra/plugin/dbt/cli/DbtCLITest.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,19 @@
import io.kestra.core.models.property.Property;
import io.kestra.core.runners.RunContext;
import io.kestra.core.runners.RunContextFactory;
import io.kestra.core.storages.StorageInterface;
import io.kestra.core.storages.kv.KVStore;
import io.kestra.core.storages.kv.KVValueAndMetadata;
import io.kestra.core.utils.IdUtils;
import io.kestra.core.utils.TestsUtils;
import io.kestra.plugin.scripts.exec.scripts.models.ScriptOutput;
import io.kestra.core.junit.annotations.KestraTest;
import jakarta.inject.Inject;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.List;
Expand All @@ -25,9 +30,33 @@

@KestraTest
class DbtCLITest {
@Inject
StorageInterface storageInterface;

@Inject
private RunContextFactory runContextFactory;

private final String NAMESPACE_ID = "io.kestra.plugin.dbt.cli.dbtclitest";

private final String MANIFEST_KEY = "manifest.json";

private static final String PROFILES = """
unit-kestra:
outputs:
dev:
dataset: kestra_unit_test_us
fixed_retries: 1
location: US
method: service-account
priority: interactive
project: kestra-unit-test
threads: 1
timeout_seconds: 300
type: bigquery
keyfile: sa.json
target: dev
""";

public void copyFolder(Path src, Path dest) throws IOException {
try (Stream<Path> stream = Files.walk(src)) {
stream
Expand All @@ -44,22 +73,7 @@ void run() throws Exception {
DbtCLI execute = DbtCLI.builder()
.id(IdUtils.create())
.type(DbtCLI.class.getName())
.profiles(Property.of("""
unit-kestra:
outputs:
dev:
dataset: kestra_unit_test_us
fixed_retries: 1
location: US
method: service-account
priority: interactive
project: kestra-unit-test
threads: 1
timeout_seconds: 300
type: bigquery
keyfile: sa.json
target: dev
""")
.profiles(Property.of(PROFILES)
)
.containerImage("ghcr.io/kestra-io/dbt-bigquery:latest")
.commands(List.of("dbt build"))
Expand All @@ -76,6 +90,72 @@ void run() throws Exception {
assertThat(runOutput.getExitCode(), is(0));
}

@Test
void testDbtCliWithStoreManifest_manifestShouldBePresentInKvStore() throws Exception {
DbtCLI execute = DbtCLI.builder()
.id(IdUtils.create())
.type(DbtCLI.class.getName())
.profiles(Property.of(PROFILES)
)
.containerImage("ghcr.io/kestra-io/dbt-bigquery:latest")
.commands(List.of("dbt build"))
.storeManifest(
DbtCLI.KvStoreManifest.builder()
.key(Property.of(MANIFEST_KEY))
.namespace(Property.of(NAMESPACE_ID))
.build()
)
.build();

RunContext runContext = TestsUtils.mockRunContext(runContextFactory, execute, Map.of());

Path workingDir = runContext.workingDir().path(true);
copyFolder(Path.of(Objects.requireNonNull(this.getClass().getClassLoader().getResource("project")).getPath()), workingDir);
createSaFile(workingDir);

ScriptOutput runOutput = execute.run(runContext);

assertThat(runOutput.getExitCode(), is(0));
KVStore kvStore = runContext.namespaceKv(NAMESPACE_ID);
assertThat(kvStore.get(MANIFEST_KEY).isPresent(), is(true));
Map<String, Object> manifestValue = (Map<String, Object>) kvStore.getValue(MANIFEST_KEY).get().value();
assertThat(((Map<String, Object>) manifestValue.get("metadata")).get("project_name"), is("unit_kestra"));
}

@Disabled("To run put a manifest.json under src/test/resources/manifest/")
@Test
void testDbtWithLoadManifest_manifestShouldBeLoadedFromKvStore() throws Exception {
DbtCLI loadManifest = DbtCLI.builder()
.id(IdUtils.create())
.type(DbtCLI.class.getName())
.profiles(Property.of(PROFILES))
.projectDir(Property.of("unit-kestra"))
.containerImage("ghcr.io/kestra-io/dbt-bigquery:latest")
.commands(List.of("dbt build --project-dir unit-kestra"))
.loadManifest(
DbtCLI.KvStoreManifest.builder()
.key(Property.of(MANIFEST_KEY))
.namespace(Property.of(NAMESPACE_ID))
.build()
)
.build();

RunContext runContextLoad = TestsUtils.mockRunContext(runContextFactory, loadManifest, Map.of());

Path workingDir = runContextLoad.workingDir().path(true);
copyFolder(Path.of(Objects.requireNonNull(this.getClass().getClassLoader().getResource("project")).getPath()),
Path.of(runContextLoad.workingDir().path().toString(),"unit-kestra"));
createSaFile(workingDir);
String manifestValue = Files.readString(Path.of(
Objects.requireNonNull(this.getClass().getClassLoader().getResource("manifest/manifest.json")).getPath())
, StandardCharsets.UTF_8);
runContextLoad.namespaceKv(NAMESPACE_ID).put(MANIFEST_KEY, new KVValueAndMetadata(null, manifestValue));

ScriptOutput runOutputLoad = loadManifest.run(runContextLoad);

assertThat(runOutputLoad.getExitCode(), is(0));
}

private void createSaFile(Path workingDir) throws IOException {
Path existingSa = Path.of(System.getenv("GOOGLE_APPLICATION_CREDENTIALS"));
Path workingDirSa = workingDir.resolve("sa.json");
Expand Down
3 changes: 1 addition & 2 deletions src/test/resources/project/models/requests.sql
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,9 @@ SELECT
unique_key,
source,
status,
zipcode_name,
status_change_date
FROM `bigquery-public-data.austin_311.311_service_requests`
LEFT JOIN {{ ref('zipcode') }} ON zipcode_id = incident_zip
LEFT JOIN {{ ref('zipcode') }} ON zipcode_id = CAST(incident_zip as INTEGER)
WHERE city IS NOT NULL
-- this filter will only be applied on an incremental run
{% if is_incremental() %}
Expand Down

0 comments on commit 4d0b183

Please sign in to comment.