From 4d0b18302c7da191826c0633b9133ae490b19acd Mon Sep 17 00:00:00 2001 From: Mathieu Gabelle <54168385+mgabelle@users.noreply.github.com> Date: Tue, 22 Oct 2024 15:34:46 +0200 Subject: [PATCH] feat: add storeManifest and loadManifest (#152) * feat: add storeManifest and loadManifest * doc: update full example flow with storeManifest add doc bump andrcuns/allure-publish-action from 2.7.1 to 2.8.0 #153 * feat: implement loadManifest property * fix: build test * doc: update flow with loadManifest example close #45 --- .github/workflows/main.yml | 2 +- .../java/io/kestra/plugin/dbt/cli/DbtCLI.java | 74 ++++++++++-- .../io/kestra/plugin/dbt/cli/BuildTest.java | 33 +++--- .../io/kestra/plugin/dbt/cli/DbtCLITest.java | 112 +++++++++++++++--- .../resources/project/models/requests.sql | 3 +- 5 files changed, 182 insertions(+), 42 deletions(-) diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index 502eaa9..02c3888 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -73,7 +73,7 @@ jobs: - uses: rlespinasse/github-slug-action@v4 - name: Publish allure report - uses: andrcuns/allure-publish-action@v2.7.1 + uses: andrcuns/allure-publish-action@v2.8.0 if: ${{ always() && env.GOOGLE_SERVICE_ACCOUNT != 0 && (github.event.inputs.skip-test == 'false' || github.event.inputs.skip-test == '') }} env: GITHUB_AUTH_TOKEN: ${{ secrets.GITHUB_TOKEN }} diff --git a/src/main/java/io/kestra/plugin/dbt/cli/DbtCLI.java b/src/main/java/io/kestra/plugin/dbt/cli/DbtCLI.java index 430f11f..5c05bd6 100644 --- a/src/main/java/io/kestra/plugin/dbt/cli/DbtCLI.java +++ b/src/main/java/io/kestra/plugin/dbt/cli/DbtCLI.java @@ -10,6 +10,8 @@ import io.kestra.core.models.tasks.runners.TaskRunner; import io.kestra.core.runners.RunContext; import io.kestra.core.serializers.JacksonMapper; +import io.kestra.core.storages.kv.KVStore; +import io.kestra.core.storages.kv.KVValueAndMetadata; import io.kestra.plugin.dbt.ResultParser; import io.kestra.plugin.scripts.exec.AbstractExecScript; import io.kestra.plugin.scripts.exec.scripts.models.DockerOptions; @@ -18,17 +20,14 @@ import io.kestra.plugin.scripts.runner.docker.Docker; import io.swagger.v3.oas.annotations.media.Schema; import jakarta.validation.Valid; -import lombok.Builder; -import lombok.EqualsAndHashCode; -import lombok.Getter; -import lombok.NoArgsConstructor; -import lombok.ToString; +import lombok.*; import lombok.experimental.SuperBuilder; import org.apache.commons.io.FileUtils; import java.io.File; import java.net.URI; import java.nio.charset.StandardCharsets; +import java.nio.file.Files; import java.nio.file.Path; import java.util.ArrayList; import java.util.List; @@ -149,6 +148,12 @@ - dbt deps --project-dir dbt --target prod - dbt build --project-dir dbt --target prod projectDir: dbt + loadManifest: + key: manifest.json + namespace: company.team + storeManifest: + key: manifest.json + namespace: company.team profiles: | my_dbt_project: outputs: @@ -220,6 +225,18 @@ public class DbtCLI extends AbstractExecScript { @Builder.Default protected String containerImage = DEFAULT_IMAGE; + @Schema( + title = "Store manifest.", + description = "Use this field to persist your manifest.json in the KV Store." + ) + protected KvStoreManifest storeManifest; + + @Schema( + title = "Load manifest.", + description = "Use this field to retrieve an existing manifest.json in the KV Store and put it in the inputFiles." + ) + protected KvStoreManifest loadManifest; + @Override protected DockerOptions injectDefaults(DockerOptions original) { if (original == null) { @@ -239,6 +256,13 @@ protected DockerOptions injectDefaults(DockerOptions original) { @Override public ScriptOutput run(RunContext runContext) throws Exception { + KVStore storeManifestKvStore = null; + + //Check/fail if a KV store exists with given namespace + if(this.getStoreManifest() != null) { + storeManifestKvStore = runContext.namespaceKv(this.getStoreManifest().getNamespace().as(runContext, String.class)); + } + CommandsWrapper commands = this.commands(runContext) .withEnableOutputDirectory(true) // force the output dir, so we can get the run_results.json and manifest.json files on each task runners .withLogConsumer(new AbstractLogConsumer() { @@ -250,6 +274,21 @@ public void accept(String line, Boolean isStdErr) { Path projectWorkingDirectory = projectDir == null ? commands.getWorkingDirectory() : commands.getWorkingDirectory().resolve(projectDir.as(runContext, String.class)); + //Load manifest from KV store + if(this.getLoadManifest() != null) { + KVStore loadManifestKvStore = runContext.namespaceKv(this.getLoadManifest().getNamespace().as(runContext, String.class)); + var manifestFile = new File(projectWorkingDirectory.toString(), "target/manifest.json"); + Object manifestValue = loadManifestKvStore.getValue(this.getLoadManifest().getKey().as(runContext, String.class)).get().value(); + + FileUtils.writeStringToFile( + manifestFile, + JacksonMapper.ofJson() + .writeValueAsString(manifestValue), + StandardCharsets.UTF_8 + ); + } + + //Create profiles.yml String profilesString = profiles == null ? null : profiles.as(runContext, String.class); if (profilesString != null && !profilesString.isEmpty()) { var profileFile = new File(commands.getWorkingDirectory().toString(), "profiles.yml"); @@ -264,6 +303,7 @@ public void accept(String line, Boolean isStdErr) { ); } + //Create and run commands List commandsArgs = ScriptService.scriptCommands( this.interpreter, this.getBeforeCommandsWithOptions(), @@ -283,16 +323,36 @@ public void accept(String line, Boolean isStdErr) { .withCommands(commandsArgs) .run(); + //Parse run results if (this.parseRunResults.as(runContext, Boolean.class) && projectWorkingDirectory.resolve("target/run_results.json").toFile().exists()) { URI results = ResultParser.parseRunResult(runContext, projectWorkingDirectory.resolve("target/run_results.json").toFile()); run.getOutputFiles().put("run_results.json", results); } - if (projectWorkingDirectory.resolve("target/manifest.json").toFile().exists()) { - URI manifest = ResultParser.parseManifest(runContext, projectWorkingDirectory.resolve("target/manifest.json").toFile()); + File manifestFile = projectWorkingDirectory.resolve("target/manifest.json").toFile(); + if (manifestFile.exists()) { + if(this.storeManifest != null) { + final String key = this.getStoreManifest().getKey().as(runContext, String.class); + storeManifestKvStore.put(key, new KVValueAndMetadata(null, JacksonMapper.toObject(Files.readString(manifestFile.toPath())))); + } + + URI manifest = ResultParser.parseManifest(runContext, manifestFile); run.getOutputFiles().put("manifest.json", manifest); + } return run; } + + @Builder + @Getter + public static class KvStoreManifest { + @NotNull + @Schema(title = "Key", description = "KV store key containing the manifest.json") + Property key; + + @NotNull + @Schema(title = "Namespace", description = "KV store namespace containing the manifest.json") + Property namespace; + } } diff --git a/src/test/java/io/kestra/plugin/dbt/cli/BuildTest.java b/src/test/java/io/kestra/plugin/dbt/cli/BuildTest.java index a53ad50..b539bfd 100644 --- a/src/test/java/io/kestra/plugin/dbt/cli/BuildTest.java +++ b/src/test/java/io/kestra/plugin/dbt/cli/BuildTest.java @@ -27,6 +27,22 @@ @KestraTest class BuildTest { + private static final String PROFILES = """ + unit-kestra: + outputs: + dev: + dataset: kestra_unit_test_us + fixed_retries: 1 + location: US + method: service-account + priority: interactive + project: kestra-unit-test + threads: 1 + timeout_seconds: 300 + type: bigquery + keyfile: sa.json + target: dev + """; @Inject private RunContextFactory runContextFactory; @@ -47,22 +63,7 @@ void run() throws Exception { .id(IdUtils.create()) .type(Setup.class.getName()) .taskRunner(Process.instance()) - .profiles(Property.of(Map.of( - "unit-kestra", Map.of( - "outputs", Map.of( - "dev", Map.of("dataset", "kestra_unit_test_us", - "fixed_retries", "1", - "location", "US", - "method", "oauth", - "priority", "interactive", - "project", "kestra-unit-test", - "threads", "1", - "timeout_seconds", "300", - "type", "bigquery" - ) - ), - "target", "dev" - )))) + .profiles(Property.of(PROFILES)) .requirements(Property.of(List.of("dbt-bigquery"))) .build(); diff --git a/src/test/java/io/kestra/plugin/dbt/cli/DbtCLITest.java b/src/test/java/io/kestra/plugin/dbt/cli/DbtCLITest.java index c29ee6b..dde833a 100644 --- a/src/test/java/io/kestra/plugin/dbt/cli/DbtCLITest.java +++ b/src/test/java/io/kestra/plugin/dbt/cli/DbtCLITest.java @@ -3,14 +3,19 @@ import io.kestra.core.models.property.Property; import io.kestra.core.runners.RunContext; import io.kestra.core.runners.RunContextFactory; +import io.kestra.core.storages.StorageInterface; +import io.kestra.core.storages.kv.KVStore; +import io.kestra.core.storages.kv.KVValueAndMetadata; import io.kestra.core.utils.IdUtils; import io.kestra.core.utils.TestsUtils; import io.kestra.plugin.scripts.exec.scripts.models.ScriptOutput; import io.kestra.core.junit.annotations.KestraTest; import jakarta.inject.Inject; +import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; import java.io.IOException; +import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.nio.file.Path; import java.util.List; @@ -25,9 +30,33 @@ @KestraTest class DbtCLITest { + @Inject + StorageInterface storageInterface; + @Inject private RunContextFactory runContextFactory; + private final String NAMESPACE_ID = "io.kestra.plugin.dbt.cli.dbtclitest"; + + private final String MANIFEST_KEY = "manifest.json"; + + private static final String PROFILES = """ + unit-kestra: + outputs: + dev: + dataset: kestra_unit_test_us + fixed_retries: 1 + location: US + method: service-account + priority: interactive + project: kestra-unit-test + threads: 1 + timeout_seconds: 300 + type: bigquery + keyfile: sa.json + target: dev + """; + public void copyFolder(Path src, Path dest) throws IOException { try (Stream stream = Files.walk(src)) { stream @@ -44,22 +73,7 @@ void run() throws Exception { DbtCLI execute = DbtCLI.builder() .id(IdUtils.create()) .type(DbtCLI.class.getName()) - .profiles(Property.of(""" - unit-kestra: - outputs: - dev: - dataset: kestra_unit_test_us - fixed_retries: 1 - location: US - method: service-account - priority: interactive - project: kestra-unit-test - threads: 1 - timeout_seconds: 300 - type: bigquery - keyfile: sa.json - target: dev - """) + .profiles(Property.of(PROFILES) ) .containerImage("ghcr.io/kestra-io/dbt-bigquery:latest") .commands(List.of("dbt build")) @@ -76,6 +90,72 @@ void run() throws Exception { assertThat(runOutput.getExitCode(), is(0)); } + @Test + void testDbtCliWithStoreManifest_manifestShouldBePresentInKvStore() throws Exception { + DbtCLI execute = DbtCLI.builder() + .id(IdUtils.create()) + .type(DbtCLI.class.getName()) + .profiles(Property.of(PROFILES) + ) + .containerImage("ghcr.io/kestra-io/dbt-bigquery:latest") + .commands(List.of("dbt build")) + .storeManifest( + DbtCLI.KvStoreManifest.builder() + .key(Property.of(MANIFEST_KEY)) + .namespace(Property.of(NAMESPACE_ID)) + .build() + ) + .build(); + + RunContext runContext = TestsUtils.mockRunContext(runContextFactory, execute, Map.of()); + + Path workingDir = runContext.workingDir().path(true); + copyFolder(Path.of(Objects.requireNonNull(this.getClass().getClassLoader().getResource("project")).getPath()), workingDir); + createSaFile(workingDir); + + ScriptOutput runOutput = execute.run(runContext); + + assertThat(runOutput.getExitCode(), is(0)); + KVStore kvStore = runContext.namespaceKv(NAMESPACE_ID); + assertThat(kvStore.get(MANIFEST_KEY).isPresent(), is(true)); + Map manifestValue = (Map) kvStore.getValue(MANIFEST_KEY).get().value(); + assertThat(((Map) manifestValue.get("metadata")).get("project_name"), is("unit_kestra")); + } + + @Disabled("To run put a manifest.json under src/test/resources/manifest/") + @Test + void testDbtWithLoadManifest_manifestShouldBeLoadedFromKvStore() throws Exception { + DbtCLI loadManifest = DbtCLI.builder() + .id(IdUtils.create()) + .type(DbtCLI.class.getName()) + .profiles(Property.of(PROFILES)) + .projectDir(Property.of("unit-kestra")) + .containerImage("ghcr.io/kestra-io/dbt-bigquery:latest") + .commands(List.of("dbt build --project-dir unit-kestra")) + .loadManifest( + DbtCLI.KvStoreManifest.builder() + .key(Property.of(MANIFEST_KEY)) + .namespace(Property.of(NAMESPACE_ID)) + .build() + ) + .build(); + + RunContext runContextLoad = TestsUtils.mockRunContext(runContextFactory, loadManifest, Map.of()); + + Path workingDir = runContextLoad.workingDir().path(true); + copyFolder(Path.of(Objects.requireNonNull(this.getClass().getClassLoader().getResource("project")).getPath()), + Path.of(runContextLoad.workingDir().path().toString(),"unit-kestra")); + createSaFile(workingDir); + String manifestValue = Files.readString(Path.of( + Objects.requireNonNull(this.getClass().getClassLoader().getResource("manifest/manifest.json")).getPath()) + , StandardCharsets.UTF_8); + runContextLoad.namespaceKv(NAMESPACE_ID).put(MANIFEST_KEY, new KVValueAndMetadata(null, manifestValue)); + + ScriptOutput runOutputLoad = loadManifest.run(runContextLoad); + + assertThat(runOutputLoad.getExitCode(), is(0)); + } + private void createSaFile(Path workingDir) throws IOException { Path existingSa = Path.of(System.getenv("GOOGLE_APPLICATION_CREDENTIALS")); Path workingDirSa = workingDir.resolve("sa.json"); diff --git a/src/test/resources/project/models/requests.sql b/src/test/resources/project/models/requests.sql index a03a63e..1e78d0c 100644 --- a/src/test/resources/project/models/requests.sql +++ b/src/test/resources/project/models/requests.sql @@ -9,10 +9,9 @@ SELECT unique_key, source, status, - zipcode_name, status_change_date FROM `bigquery-public-data.austin_311.311_service_requests` - LEFT JOIN {{ ref('zipcode') }} ON zipcode_id = incident_zip + LEFT JOIN {{ ref('zipcode') }} ON zipcode_id = CAST(incident_zip as INTEGER) WHERE city IS NOT NULL -- this filter will only be applied on an incremental run {% if is_incremental() %}