From 42a4bb828047c44efa39c241b0549ae5a2351911 Mon Sep 17 00:00:00 2001 From: Mathieu Gabelle Date: Mon, 21 Oct 2024 16:47:11 +0200 Subject: [PATCH 1/5] feat: add storeManifest and loadManifest --- .../java/io/kestra/plugin/dbt/cli/DbtCLI.java | 49 ++++++++++-- .../io/kestra/plugin/dbt/cli/DbtCLITest.java | 75 +++++++++++++++---- .../resources/project/models/requests.sql | 3 +- 3 files changed, 102 insertions(+), 25 deletions(-) diff --git a/src/main/java/io/kestra/plugin/dbt/cli/DbtCLI.java b/src/main/java/io/kestra/plugin/dbt/cli/DbtCLI.java index 430f11f..dbc0927 100644 --- a/src/main/java/io/kestra/plugin/dbt/cli/DbtCLI.java +++ b/src/main/java/io/kestra/plugin/dbt/cli/DbtCLI.java @@ -10,6 +10,8 @@ import io.kestra.core.models.tasks.runners.TaskRunner; import io.kestra.core.runners.RunContext; import io.kestra.core.serializers.JacksonMapper; +import io.kestra.core.storages.kv.KVStore; +import io.kestra.core.storages.kv.KVValueAndMetadata; import io.kestra.plugin.dbt.ResultParser; import io.kestra.plugin.scripts.exec.AbstractExecScript; import io.kestra.plugin.scripts.exec.scripts.models.DockerOptions; @@ -18,17 +20,14 @@ import io.kestra.plugin.scripts.runner.docker.Docker; import io.swagger.v3.oas.annotations.media.Schema; import jakarta.validation.Valid; -import lombok.Builder; -import lombok.EqualsAndHashCode; -import lombok.Getter; -import lombok.NoArgsConstructor; -import lombok.ToString; +import lombok.*; import lombok.experimental.SuperBuilder; import org.apache.commons.io.FileUtils; import java.io.File; import java.net.URI; import java.nio.charset.StandardCharsets; +import java.nio.file.Files; import java.nio.file.Path; import java.util.ArrayList; import java.util.List; @@ -220,6 +219,18 @@ public class DbtCLI extends AbstractExecScript { @Builder.Default protected String containerImage = DEFAULT_IMAGE; + @Schema( + title = "Store manifest.", + description = "Use this field to persist your manifest.json in the KV Store." + ) + protected KvStoreManifest storeManifest; + + @Schema( + title = "Load manifest.", + description = "Use this field to retrieve an existing manifest.json in the KV Store and put it in the inputFiles." + ) + protected KvStoreManifest loadManifest; + @Override protected DockerOptions injectDefaults(DockerOptions original) { if (original == null) { @@ -239,6 +250,11 @@ protected DockerOptions injectDefaults(DockerOptions original) { @Override public ScriptOutput run(RunContext runContext) throws Exception { + KVStore kvStore = null; + if(this.getStoreManifest() != null) { + kvStore = runContext.namespaceKv(this.getStoreManifest().getNamespace().as(runContext, String.class)); + } + CommandsWrapper commands = this.commands(runContext) .withEnableOutputDirectory(true) // force the output dir, so we can get the run_results.json and manifest.json files on each task runners .withLogConsumer(new AbstractLogConsumer() { @@ -288,11 +304,30 @@ public void accept(String line, Boolean isStdErr) { run.getOutputFiles().put("run_results.json", results); } - if (projectWorkingDirectory.resolve("target/manifest.json").toFile().exists()) { - URI manifest = ResultParser.parseManifest(runContext, projectWorkingDirectory.resolve("target/manifest.json").toFile()); + File manifestFile = projectWorkingDirectory.resolve("target/manifest.json").toFile(); + if (manifestFile.exists()) { + if(this.storeManifest != null) { + final String key = this.getStoreManifest().getKey().as(runContext, String.class); + kvStore.put(key, new KVValueAndMetadata(null, JacksonMapper.toObject(Files.readString(manifestFile.toPath())))); + } + + URI manifest = ResultParser.parseManifest(runContext, manifestFile); run.getOutputFiles().put("manifest.json", manifest); + } return run; } + + @Builder + @Getter + public static class KvStoreManifest { + @NotNull + @Schema(title = "Key", description = "KV store key containing the manifest.json") + Property key; + + @NotNull + @Schema(title = "Namespace", description = "KV store namespace containing the manifest.json") + Property namespace; + } } diff --git a/src/test/java/io/kestra/plugin/dbt/cli/DbtCLITest.java b/src/test/java/io/kestra/plugin/dbt/cli/DbtCLITest.java index c29ee6b..5a769e8 100644 --- a/src/test/java/io/kestra/plugin/dbt/cli/DbtCLITest.java +++ b/src/test/java/io/kestra/plugin/dbt/cli/DbtCLITest.java @@ -3,6 +3,8 @@ import io.kestra.core.models.property.Property; import io.kestra.core.runners.RunContext; import io.kestra.core.runners.RunContextFactory; +import io.kestra.core.storages.StorageInterface; +import io.kestra.core.storages.kv.KVStore; import io.kestra.core.utils.IdUtils; import io.kestra.core.utils.TestsUtils; import io.kestra.plugin.scripts.exec.scripts.models.ScriptOutput; @@ -25,9 +27,33 @@ @KestraTest class DbtCLITest { + @Inject + StorageInterface storageInterface; + @Inject private RunContextFactory runContextFactory; + private final String NAMESPACE_ID = "io.kestra.plugin.dbt.cli.dbtclitest"; + + private final String MANIFEST_KEY = "manifest.json"; + + private static final String PROFILES = """ + unit-kestra: + outputs: + dev: + dataset: kestra_unit_test_us + fixed_retries: 1 + location: US + method: service-account + priority: interactive + project: kestra-unit-test + threads: 1 + timeout_seconds: 300 + type: bigquery + keyfile: sa.json + target: dev + """; + public void copyFolder(Path src, Path dest) throws IOException { try (Stream stream = Files.walk(src)) { stream @@ -44,22 +70,7 @@ void run() throws Exception { DbtCLI execute = DbtCLI.builder() .id(IdUtils.create()) .type(DbtCLI.class.getName()) - .profiles(Property.of(""" - unit-kestra: - outputs: - dev: - dataset: kestra_unit_test_us - fixed_retries: 1 - location: US - method: service-account - priority: interactive - project: kestra-unit-test - threads: 1 - timeout_seconds: 300 - type: bigquery - keyfile: sa.json - target: dev - """) + .profiles(Property.of(PROFILES) ) .containerImage("ghcr.io/kestra-io/dbt-bigquery:latest") .commands(List.of("dbt build")) @@ -76,6 +87,38 @@ void run() throws Exception { assertThat(runOutput.getExitCode(), is(0)); } + @Test + void testDbtCliWithStoreManifest_manifestShouldBePresentInKvStore() throws Exception { + DbtCLI execute = DbtCLI.builder() + .id(IdUtils.create()) + .type(DbtCLI.class.getName()) + .profiles(Property.of(PROFILES) + ) + .containerImage("ghcr.io/kestra-io/dbt-bigquery:latest") + .commands(List.of("dbt build")) + .storeManifest( + DbtCLI.KvStoreManifest.builder() + .key(Property.of(MANIFEST_KEY)) + .namespace(Property.of(NAMESPACE_ID)) + .build() + ) + .build(); + + RunContext runContext = TestsUtils.mockRunContext(runContextFactory, execute, Map.of()); + + Path workingDir = runContext.workingDir().path(true); + copyFolder(Path.of(Objects.requireNonNull(this.getClass().getClassLoader().getResource("project")).getPath()), workingDir); + createSaFile(workingDir); + + ScriptOutput runOutput = execute.run(runContext); + + assertThat(runOutput.getExitCode(), is(0)); + KVStore kvStore = runContext.namespaceKv(NAMESPACE_ID); + assertThat(kvStore.get(MANIFEST_KEY).isPresent(), is(true)); + Map manifestValue = (Map) kvStore.getValue(MANIFEST_KEY).get().value(); + assertThat(((Map) manifestValue.get("metadata")).get("project_name"), is("unit_kestra")); + } + private void createSaFile(Path workingDir) throws IOException { Path existingSa = Path.of(System.getenv("GOOGLE_APPLICATION_CREDENTIALS")); Path workingDirSa = workingDir.resolve("sa.json"); diff --git a/src/test/resources/project/models/requests.sql b/src/test/resources/project/models/requests.sql index a03a63e..1e78d0c 100644 --- a/src/test/resources/project/models/requests.sql +++ b/src/test/resources/project/models/requests.sql @@ -9,10 +9,9 @@ SELECT unique_key, source, status, - zipcode_name, status_change_date FROM `bigquery-public-data.austin_311.311_service_requests` - LEFT JOIN {{ ref('zipcode') }} ON zipcode_id = incident_zip + LEFT JOIN {{ ref('zipcode') }} ON zipcode_id = CAST(incident_zip as INTEGER) WHERE city IS NOT NULL -- this filter will only be applied on an incremental run {% if is_incremental() %} From e194ad4655fc9df1a6beda0584687ddde5b03239 Mon Sep 17 00:00:00 2001 From: Mathieu Gabelle Date: Mon, 21 Oct 2024 16:55:27 +0200 Subject: [PATCH 2/5] doc: update full example flow with storeManifest add doc bump andrcuns/allure-publish-action from 2.7.1 to 2.8.0 #153 --- .github/workflows/main.yml | 2 +- src/main/java/io/kestra/plugin/dbt/cli/DbtCLI.java | 3 +++ 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index 502eaa9..02c3888 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -73,7 +73,7 @@ jobs: - uses: rlespinasse/github-slug-action@v4 - name: Publish allure report - uses: andrcuns/allure-publish-action@v2.7.1 + uses: andrcuns/allure-publish-action@v2.8.0 if: ${{ always() && env.GOOGLE_SERVICE_ACCOUNT != 0 && (github.event.inputs.skip-test == 'false' || github.event.inputs.skip-test == '') }} env: GITHUB_AUTH_TOKEN: ${{ secrets.GITHUB_TOKEN }} diff --git a/src/main/java/io/kestra/plugin/dbt/cli/DbtCLI.java b/src/main/java/io/kestra/plugin/dbt/cli/DbtCLI.java index dbc0927..9138e60 100644 --- a/src/main/java/io/kestra/plugin/dbt/cli/DbtCLI.java +++ b/src/main/java/io/kestra/plugin/dbt/cli/DbtCLI.java @@ -148,6 +148,9 @@ - dbt deps --project-dir dbt --target prod - dbt build --project-dir dbt --target prod projectDir: dbt + storeManifest: + key: manifest.json + namespace: company.team profiles: | my_dbt_project: outputs: From 9375a5ac64fe3fd3b8e6b833676766381716241c Mon Sep 17 00:00:00 2001 From: Mathieu Gabelle Date: Tue, 22 Oct 2024 14:47:35 +0200 Subject: [PATCH 3/5] feat: implement loadManifest property --- .../java/io/kestra/plugin/dbt/cli/DbtCLI.java | 25 +++++++++++-- .../io/kestra/plugin/dbt/cli/BuildTest.java | 2 +- .../io/kestra/plugin/dbt/cli/DbtCLITest.java | 37 +++++++++++++++++++ 3 files changed, 60 insertions(+), 4 deletions(-) diff --git a/src/main/java/io/kestra/plugin/dbt/cli/DbtCLI.java b/src/main/java/io/kestra/plugin/dbt/cli/DbtCLI.java index 9138e60..d9081ea 100644 --- a/src/main/java/io/kestra/plugin/dbt/cli/DbtCLI.java +++ b/src/main/java/io/kestra/plugin/dbt/cli/DbtCLI.java @@ -253,9 +253,11 @@ protected DockerOptions injectDefaults(DockerOptions original) { @Override public ScriptOutput run(RunContext runContext) throws Exception { - KVStore kvStore = null; + KVStore storeManifestKvStore = null; + + //Check/fail if a KV store exists with given namespace if(this.getStoreManifest() != null) { - kvStore = runContext.namespaceKv(this.getStoreManifest().getNamespace().as(runContext, String.class)); + storeManifestKvStore = runContext.namespaceKv(this.getStoreManifest().getNamespace().as(runContext, String.class)); } CommandsWrapper commands = this.commands(runContext) @@ -269,6 +271,21 @@ public void accept(String line, Boolean isStdErr) { Path projectWorkingDirectory = projectDir == null ? commands.getWorkingDirectory() : commands.getWorkingDirectory().resolve(projectDir.as(runContext, String.class)); + //Load manifest from KV store + if(this.getLoadManifest() != null) { + KVStore loadManifestKvStore = runContext.namespaceKv(this.getLoadManifest().getNamespace().as(runContext, String.class)); + var manifestFile = new File(projectWorkingDirectory.toString(), "target/manifest.json"); + Object manifestValue = loadManifestKvStore.getValue(this.getLoadManifest().getKey().as(runContext, String.class)).get().value(); + + FileUtils.writeStringToFile( + manifestFile, + JacksonMapper.ofJson() + .writeValueAsString(manifestValue), + StandardCharsets.UTF_8 + ); + } + + //Create profiles.yml String profilesString = profiles == null ? null : profiles.as(runContext, String.class); if (profilesString != null && !profilesString.isEmpty()) { var profileFile = new File(commands.getWorkingDirectory().toString(), "profiles.yml"); @@ -283,6 +300,7 @@ public void accept(String line, Boolean isStdErr) { ); } + //Create and run commands List commandsArgs = ScriptService.scriptCommands( this.interpreter, this.getBeforeCommandsWithOptions(), @@ -302,6 +320,7 @@ public void accept(String line, Boolean isStdErr) { .withCommands(commandsArgs) .run(); + //Parse run results if (this.parseRunResults.as(runContext, Boolean.class) && projectWorkingDirectory.resolve("target/run_results.json").toFile().exists()) { URI results = ResultParser.parseRunResult(runContext, projectWorkingDirectory.resolve("target/run_results.json").toFile()); run.getOutputFiles().put("run_results.json", results); @@ -311,7 +330,7 @@ public void accept(String line, Boolean isStdErr) { if (manifestFile.exists()) { if(this.storeManifest != null) { final String key = this.getStoreManifest().getKey().as(runContext, String.class); - kvStore.put(key, new KVValueAndMetadata(null, JacksonMapper.toObject(Files.readString(manifestFile.toPath())))); + storeManifestKvStore.put(key, new KVValueAndMetadata(null, JacksonMapper.toObject(Files.readString(manifestFile.toPath())))); } URI manifest = ResultParser.parseManifest(runContext, manifestFile); diff --git a/src/test/java/io/kestra/plugin/dbt/cli/BuildTest.java b/src/test/java/io/kestra/plugin/dbt/cli/BuildTest.java index a53ad50..7b00768 100644 --- a/src/test/java/io/kestra/plugin/dbt/cli/BuildTest.java +++ b/src/test/java/io/kestra/plugin/dbt/cli/BuildTest.java @@ -61,7 +61,7 @@ void run() throws Exception { "type", "bigquery" ) ), - "target", "dev" + "manifest", "dev" )))) .requirements(Property.of(List.of("dbt-bigquery"))) .build(); diff --git a/src/test/java/io/kestra/plugin/dbt/cli/DbtCLITest.java b/src/test/java/io/kestra/plugin/dbt/cli/DbtCLITest.java index 5a769e8..dde833a 100644 --- a/src/test/java/io/kestra/plugin/dbt/cli/DbtCLITest.java +++ b/src/test/java/io/kestra/plugin/dbt/cli/DbtCLITest.java @@ -5,14 +5,17 @@ import io.kestra.core.runners.RunContextFactory; import io.kestra.core.storages.StorageInterface; import io.kestra.core.storages.kv.KVStore; +import io.kestra.core.storages.kv.KVValueAndMetadata; import io.kestra.core.utils.IdUtils; import io.kestra.core.utils.TestsUtils; import io.kestra.plugin.scripts.exec.scripts.models.ScriptOutput; import io.kestra.core.junit.annotations.KestraTest; import jakarta.inject.Inject; +import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; import java.io.IOException; +import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.nio.file.Path; import java.util.List; @@ -119,6 +122,40 @@ void testDbtCliWithStoreManifest_manifestShouldBePresentInKvStore() throws Excep assertThat(((Map) manifestValue.get("metadata")).get("project_name"), is("unit_kestra")); } + @Disabled("To run put a manifest.json under src/test/resources/manifest/") + @Test + void testDbtWithLoadManifest_manifestShouldBeLoadedFromKvStore() throws Exception { + DbtCLI loadManifest = DbtCLI.builder() + .id(IdUtils.create()) + .type(DbtCLI.class.getName()) + .profiles(Property.of(PROFILES)) + .projectDir(Property.of("unit-kestra")) + .containerImage("ghcr.io/kestra-io/dbt-bigquery:latest") + .commands(List.of("dbt build --project-dir unit-kestra")) + .loadManifest( + DbtCLI.KvStoreManifest.builder() + .key(Property.of(MANIFEST_KEY)) + .namespace(Property.of(NAMESPACE_ID)) + .build() + ) + .build(); + + RunContext runContextLoad = TestsUtils.mockRunContext(runContextFactory, loadManifest, Map.of()); + + Path workingDir = runContextLoad.workingDir().path(true); + copyFolder(Path.of(Objects.requireNonNull(this.getClass().getClassLoader().getResource("project")).getPath()), + Path.of(runContextLoad.workingDir().path().toString(),"unit-kestra")); + createSaFile(workingDir); + String manifestValue = Files.readString(Path.of( + Objects.requireNonNull(this.getClass().getClassLoader().getResource("manifest/manifest.json")).getPath()) + , StandardCharsets.UTF_8); + runContextLoad.namespaceKv(NAMESPACE_ID).put(MANIFEST_KEY, new KVValueAndMetadata(null, manifestValue)); + + ScriptOutput runOutputLoad = loadManifest.run(runContextLoad); + + assertThat(runOutputLoad.getExitCode(), is(0)); + } + private void createSaFile(Path workingDir) throws IOException { Path existingSa = Path.of(System.getenv("GOOGLE_APPLICATION_CREDENTIALS")); Path workingDirSa = workingDir.resolve("sa.json"); From 07ba3f4d28445d37c13242fc8b5343682d8cc20d Mon Sep 17 00:00:00 2001 From: Mathieu Gabelle Date: Tue, 22 Oct 2024 15:11:50 +0200 Subject: [PATCH 4/5] fix: build test --- .../io/kestra/plugin/dbt/cli/BuildTest.java | 33 ++++++++++--------- 1 file changed, 17 insertions(+), 16 deletions(-) diff --git a/src/test/java/io/kestra/plugin/dbt/cli/BuildTest.java b/src/test/java/io/kestra/plugin/dbt/cli/BuildTest.java index 7b00768..b539bfd 100644 --- a/src/test/java/io/kestra/plugin/dbt/cli/BuildTest.java +++ b/src/test/java/io/kestra/plugin/dbt/cli/BuildTest.java @@ -27,6 +27,22 @@ @KestraTest class BuildTest { + private static final String PROFILES = """ + unit-kestra: + outputs: + dev: + dataset: kestra_unit_test_us + fixed_retries: 1 + location: US + method: service-account + priority: interactive + project: kestra-unit-test + threads: 1 + timeout_seconds: 300 + type: bigquery + keyfile: sa.json + target: dev + """; @Inject private RunContextFactory runContextFactory; @@ -47,22 +63,7 @@ void run() throws Exception { .id(IdUtils.create()) .type(Setup.class.getName()) .taskRunner(Process.instance()) - .profiles(Property.of(Map.of( - "unit-kestra", Map.of( - "outputs", Map.of( - "dev", Map.of("dataset", "kestra_unit_test_us", - "fixed_retries", "1", - "location", "US", - "method", "oauth", - "priority", "interactive", - "project", "kestra-unit-test", - "threads", "1", - "timeout_seconds", "300", - "type", "bigquery" - ) - ), - "manifest", "dev" - )))) + .profiles(Property.of(PROFILES)) .requirements(Property.of(List.of("dbt-bigquery"))) .build(); From fa852e100684d155d890161615e8ad1f7f899ee1 Mon Sep 17 00:00:00 2001 From: Mathieu Gabelle Date: Tue, 22 Oct 2024 15:22:11 +0200 Subject: [PATCH 5/5] doc: update flow with loadManifest example --- src/main/java/io/kestra/plugin/dbt/cli/DbtCLI.java | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/main/java/io/kestra/plugin/dbt/cli/DbtCLI.java b/src/main/java/io/kestra/plugin/dbt/cli/DbtCLI.java index d9081ea..5c05bd6 100644 --- a/src/main/java/io/kestra/plugin/dbt/cli/DbtCLI.java +++ b/src/main/java/io/kestra/plugin/dbt/cli/DbtCLI.java @@ -148,6 +148,9 @@ - dbt deps --project-dir dbt --target prod - dbt build --project-dir dbt --target prod projectDir: dbt + loadManifest: + key: manifest.json + namespace: company.team storeManifest: key: manifest.json namespace: company.team