diff --git a/CHANGELOG.md b/CHANGELOG.md index 70f0a1c42..ec68eab27 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,7 @@ +# Release 0.20.0 +- [GH-1681] SG/GDC is reprocessing CRAMs and running into Clio update conflicts. ([#620](https://github.com/broadinstitute/wfl/pull/620)) +- [GH-1674] wfl.module.aou/add-aou-workload! should not add to a stopped workload. ([#617](https://github.com/broadinstitute/wfl/pull/617)) + # Release 0.19.1 - [GH-1686] Deploy WFL to point at ExternalExomeReprocessing_v3.1.7 and pick up picard-cloud:2.26.10. ([#618](https://github.com/broadinstitute/wfl/pull/618)) - [GH-1667] Don't delete a Firecloud entity before a Rawls batchUpsert. ([#616](https://github.com/broadinstitute/wfl/pull/616)) diff --git a/api/src/wfl/module/aou.clj b/api/src/wfl/module/aou.clj index 8ce40b81b..c3a26dd1e 100644 --- a/api/src/wfl/module/aou.clj +++ b/api/src/wfl/module/aou.clj @@ -1,22 +1,23 @@ (ns wfl.module.aou "Process Arrays for the All Of Us project." - (:require [clojure.spec.alpha :as s] - [clojure.string :as str] - [wfl.api.workloads :as workloads :refer [defoverload]] - [wfl.jdbc :as jdbc] - [wfl.log :as log] - [wfl.module.all :as all] - [wfl.module.batch :as batch] - [wfl.references :as references] - [wfl.service.cromwell :as cromwell] - [wfl.service.google.storage :as gcs] - [wfl.service.postgres :as postgres] - [wfl.util :as util] - [wfl.wfl :as wfl]) + (:require [clojure.spec.alpha :as s] + [clojure.string :as str] + [wfl.api.workloads :as workloads :refer [defoverload]] + [wfl.jdbc :as jdbc] + [wfl.log :as log] + [wfl.module.all :as all] + [wfl.module.batch :as batch] + [wfl.references :as references] + [wfl.service.cromwell :as cromwell] + [wfl.service.postgres :as postgres] + [wfl.util :as util] + [wfl.wfl :as wfl]) (:import [java.sql Timestamp] - [java.time OffsetDateTime] + [java.time Instant] [java.util UUID])) +;; This must agree with the AoU cloud function. +;; (def pipeline "AllOfUsArrays") ;; specs @@ -70,8 +71,7 @@ {:environment "prod" :vault_token_path "gs://broad-dsp-gotc-arrays-prod-tokens/arrayswdl.token"}]) -;; visible for testing -(defn cromwell->inputs+options +(defn ^:private cromwell->inputs+options "Map cromwell URL to workflow inputs and options for submitting an AllOfUs Arrays workflow. The returned environment string here is just a default, input file may specify override." [url] @@ -135,23 +135,26 @@ (update :environment str/lower-case) (util/prefix-keys :Arrays.))) -;; visible for testing -(def default-options +(def ^:private default-options {:use_relative_output_paths true :read_from_cache true :write_to_cache true - :default_runtime_attributes {:zones "us-central1-a us-central1-b us-central1-c us-central1-f" - :maxRetries 1}}) + :default_runtime_attributes + {:zones "us-central1-a us-central1-b us-central1-c us-central1-f" + :maxRetries 1}}) + +(def ^:private primary-keys + "These uniquely identify a sample so use them as a primary key." + [:chip_well_barcode :analysis_version_number]) (defn make-labels "Return labels for aou arrays pipeline from PER-SAMPLE-INPUTS and OTHER-LABELS." [per-sample-inputs other-labels] (merge cromwell-label-map - (select-keys per-sample-inputs [:analysis_version_number :chip_well_barcode]) + (select-keys per-sample-inputs primary-keys) other-labels)) -;; visible for testing -(defn submit-aou-workflow +(defn ^:private submit-aou-workflow "Submit one workflow to Cromwell URL given PER-SAMPLE-INPUTS, WORKFLOW-OPTIONS and OTHER-LABELS." [url per-sample-inputs workflow-options other-labels] @@ -162,80 +165,72 @@ workflow-options (make-labels per-sample-inputs other-labels))) -;; This needs the id generated by the jdbc/insert! to name a new table, -;; so update the workload table after creating the AllOfUsArrays table. +;; Update the workload table row with the name of the AllOfUsArrays table. ;; https://www.postgresql.org/docs/current/datatype-numeric.html#DATATYPE-SERIAL ;; -(defn add-aou-workload! - "Use transaction `tx` to record the workflow described in `_request`. - Add the workflow to an old workload matching the `_request` parameters. - Otherwise create a new workload to match those parameters. " - [tx {:keys [creator executor pipeline project output watchers] :as _request}] - (gcs/parse-gs-url output) - (let [slashified-output (util/slashify output) - {:keys [release path]} workflow-wdl - {:keys [commit version]} (wfl/get-the-version) - query-string (str/join \space - ["SELECT * FROM workload" - "WHERE project = ?" - "AND pipeline = ?::pipeline" - "AND release = ?" - "AND output = ?"]) - workloads (jdbc/query tx [query-string - project pipeline - release slashified-output]) - n (count workloads)] +(defn ^:private make-new-aou-workload! + "Use transaction `tx` to record a new `workload` and return its `id`." + [tx workload] + (let [id (->> workload (jdbc/insert! tx :workload) first :id) + table (format "%s_%09d" pipeline id) + table_seq (format "%s_id_seq" table) + kind (format (str/join \space ["UPDATE workload" + "SET pipeline = '%s'::pipeline" + "WHERE id = '%s'"]) pipeline id) + index (format "CREATE SEQUENCE %s AS bigint" table_seq) + work (format (str/join \space ["CREATE TABLE %s OF %s" + "(PRIMARY KEY" + "(analysis_version_number," + "chip_well_barcode)," + "id WITH OPTIONS NOT NULL" + "DEFAULT nextval('%s'))"]) + table pipeline table_seq) + alter (format "ALTER SEQUENCE %s OWNED BY %s.id" table_seq table)] + (jdbc/db-do-commands tx [kind index work alter]) + (jdbc/update! tx :workload {:items table} ["id = ?" id]) + id)) + +(defn ^:private add-aou-workload! + "Use transaction `tx` to find a workload matching `request`, or make a + new one, and return the workload's `id`. " + [tx request] + (let [{:keys [creator executor pipeline project output watchers]} request + slashified (util/slashify output) + {:keys [release path]} workflow-wdl + query-string (str/join \space ["SELECT * FROM workload" + "WHERE stopped is null" + "AND project = ?" + "AND pipeline = ?::pipeline" + "AND release = ?" + "AND output = ?"]) + workloads (jdbc/query tx [query-string project pipeline + release slashified]) + n (count workloads)] (when (> n 1) (log/error "Too many workloads" :count n :workloads workloads)) (if-let [workload (first workloads)] (:id workload) - (let [id (->> {:commit commit - :creator creator - :executor executor - :output slashified-output - :project project - :release release - :uuid (UUID/randomUUID) - :version version - :watchers (pr-str watchers) - :wdl path} - (jdbc/insert! tx :workload) - first :id) - table (format "%s_%09d" pipeline id) - table_seq (format "%s_id_seq" table) - kind (format (str/join \space - ["UPDATE workload" - "SET pipeline = '%s'::pipeline" - "WHERE id = '%s'"]) pipeline id) - idx (format "CREATE SEQUENCE %s AS bigint" table_seq) - work (format (str/join \space - ["CREATE TABLE %s OF %s" - "(PRIMARY KEY" - "(analysis_version_number," - "chip_well_barcode)," - "id WITH OPTIONS" - "NOT NULL" - "DEFAULT nextval('%s'))"]) - table pipeline table_seq) - link-idx-work (format "ALTER SEQUENCE %s OWNED BY %s.id" - table_seq table)] - (jdbc/db-do-commands tx [kind idx work link-idx-work]) - (jdbc/update! tx :workload {:items table} ["id = ?" id]) - id)))) + (let [{:keys [commit version]} (wfl/get-the-version)] + (make-new-aou-workload! tx {:commit commit + :creator creator + :executor executor + :output slashified + :project project + :release release + :uuid (UUID/randomUUID) + :version version + :watchers (pr-str watchers) + :wdl path}))))) -(defn start-aou-workload! +(defn ^:private start-aou-workload! "Use transaction `tx` to start `workload` so it becomes append-able." [tx {:keys [id] :as workload}] (if (:started workload) workload - (let [now {:started (Timestamp/from (.toInstant (OffsetDateTime/now)))}] + (let [now {:started (Timestamp/from (Instant/now))}] (jdbc/update! tx :workload now ["id = ?" id]) (merge workload now)))) -(def primary-keys - "These uniquely identify a sample so use them as a primary key." - [:chip_well_barcode :analysis_version_number]) - (defn ^:private primary-values [sample] (mapv sample primary-keys)) @@ -267,29 +262,33 @@ (second (reduce go [known-keys []] samples)))) (defn append-to-workload! - "Use transaction `tx` to append `notifications` (or samples) to `workload`. - Note: - - The `workload` must be `started` in order to be append-able. - - All samples being appended will be submitted immediately." + "Use transaction `tx` to add `notifications` (samples) to `workload`. + Note: - The `workload` must be `started` in order to be append-able. + - All samples being appended will be submitted immediately." [tx notifications {:keys [uuid items output executor] :as workload}] (when-not (:started workload) (throw (Exception. (format "Workload %s is not started" uuid)))) (when (:stopped workload) (throw (Exception. (format "Workload %s has been stopped" uuid)))) (letfn [(submit! [url sample] - (let [output-path (str output (str/join "/" (primary-values sample))) - workflow-options (util/deep-merge default-options - {:final_workflow_outputs_dir output-path})] - (->> (submit-aou-workflow url sample workflow-options {:workload uuid}) + (let [output-path (str output + (str/join "/" (primary-values sample))) + workflow-options (util/deep-merge + default-options + {:final_workflow_outputs_dir output-path})] + (->> {:workload uuid} + (submit-aou-workflow url sample workflow-options) str ; coerce java.util.UUID -> string (assoc (select-keys sample primary-keys) - :updated (Timestamp/from (.toInstant (OffsetDateTime/now))) + :updated (Timestamp/from (Instant/now)) :status "Submitted" :uuid))))] - (let [executor (is-known-cromwell-url? executor) + (let [executor (is-known-cromwell-url? executor) submitted-samples (map (partial submit! executor) - (remove-existing-samples notifications - (get-existing-samples tx items notifications)))] + (remove-existing-samples + notifications + (get-existing-samples + tx items notifications)))] (jdbc/insert-multi! tx items submitted-samples) submitted-samples))) diff --git a/api/src/wfl/module/copyfile.clj b/api/src/wfl/module/copyfile.clj index c589a2d34..4f4c1b27b 100644 --- a/api/src/wfl/module/copyfile.clj +++ b/api/src/wfl/module/copyfile.clj @@ -130,10 +130,11 @@ (util/deep-merge default-options options) {:workload uuid})]) (update! [tx [id uuid]] - (jdbc/update! tx items - {:updated (OffsetDateTime/now) :uuid uuid :status "Submitted"} - ["id = ?" id]))] - (run! (comp (partial update! tx) submit!) (workloads/workflows tx workload)) + (jdbc/update! tx items {:status "Submitted" + :updated (OffsetDateTime/now) + :uuid uuid} ["id = ?" id]))] + (run! (comp (partial update! tx) submit!) + (workloads/workflows tx workload)) (jdbc/update! tx :workload {:started (OffsetDateTime/now)} ["uuid = ?" uuid])))) diff --git a/api/src/wfl/module/sg.clj b/api/src/wfl/module/sg.clj index 381d48891..4c1a8bffd 100644 --- a/api/src/wfl/module/sg.clj +++ b/api/src/wfl/module/sg.clj @@ -134,8 +134,8 @@ :sample_alias :version])))) -(defn final_workflow_outputs_dir_hack ; for testing - "Do to `file` what `{:final_workflow_outputs_dir output}` does." +(defn ^:private final_workflow_outputs_dir_hack + "Do to `file` what Cromwell's `{:final_workflow_outputs_dir output}` does." [output file] (->> (str/split file #"/") (drop 3) @@ -153,14 +153,41 @@ (log/warn "Need output files for Clio.") (log/error {:need need})))) +;; This hack depends on how Clio spells error messages. +;; +(defn ^:private hack-try-increment-version-in-clio-add-bam? + "True when `exception` suggests that `clio-add-bam` might succeed + with the version incremented." + [exception] + (let [{:keys [body reason-phrase status]} (ex-data exception)] + (and + (== 400 status) + (= "Bad Request" reason-phrase) + (str/starts-with? + body + "Adding this document will overwrite the following existing metadata:") + (str/ends-with? + body + "Use 'force=true' to overwrite the existing data.")))) + +(defn ^:private hack-clio-add-bam-with-version-incremented + "Attempt to add `bam` record to `clio` with version `increment`ed." + [clio bam increment] + (when (> increment 2) + (throw (ex-info "Cannot update Clio" {:bam bam :clio clio}))) + (try (clio/add-bam clio (update bam :version + increment)) + (catch Throwable x + (log/warning {:bam bam :x x}) + (if (hack-try-increment-version-in-clio-add-bam? x) + (hack-clio-add-bam-with-version-incremented clio bam (inc increment)) + (throw x))))) + (defn ^:private clio-add-bam - "Add `bam` record to `clio`." + "Add `bam` record to `clio`, and maybe retry after incrementing :version." [clio bam] - (try (clio/add-bam clio bam) - (catch Throwable x - (log/error {:bam bam :x x})))) + (hack-clio-add-bam-with-version-incremented clio bam 0)) -(defn maybe-update-clio-and-write-final-files +(defn ^:private maybe-update-clio-and-write-final-files "Maybe update `clio-url` with `final` and write files and `metadata`." [clio-url final {:keys [inputs] :as metadata}] #_(log-missing-final-files-for-debugging final) diff --git a/api/src/wfl/sink.clj b/api/src/wfl/sink.clj index 8ff06b740..f9d2a8a2f 100644 --- a/api/src/wfl/sink.clj +++ b/api/src/wfl/sink.clj @@ -19,8 +19,7 @@ [wfl.service.rawls :as rawls] [wfl.stage :as stage] [wfl.util :as util :refer [utc-now]]) - (:import [clojure.lang ExceptionInfo] - [wfl.util UserException])) + (:import [wfl.util UserException])) (defmulti create-sink! "Create a `Sink` instance using the database `transaction` and configuration @@ -102,14 +101,14 @@ (assoc :type terra-workspace-sink-type)) (throw (ex-info "Invalid sink_items" {:workload workload})))) -(def unknown-entity-type-error-message +(def ^:private unknown-entity-type-error-message "The entityType was not found in workspace.") -(def terra-workspace-malformed-from-outputs-message +(def ^:private terra-workspace-malformed-from-outputs-message (str/join " " ["fromOutputs must define a mapping from workflow outputs" "to the attributes of entityType."])) -(def unknown-attributes-error-message +(def ^:private unknown-attributes-error-message (str/join " " ["Found additional attributes in fromOutputs that are not" "present in the entityType."])) @@ -169,15 +168,6 @@ {:fromOutputs fromOutputs :workflow workflow} cause))))) -(defn ^:private entity-exists? - "True when the `entity` exists in the `workspace`." - [workspace entity] - (try - (firecloud/get-entity workspace entity) - (catch ExceptionInfo ex - (when (not= (-> ex ex-data :status) 404) - (throw ex))))) - (def ^:private entity-name-not-found-error-message (str/join \space ["Entity name not found:" "sink.identifer not in workflow outputs or inputs"])) diff --git a/api/src/wfl/source.clj b/api/src/wfl/source.clj index 41a7402d7..d8ea9a819 100644 --- a/api/src/wfl/source.clj +++ b/api/src/wfl/source.clj @@ -57,12 +57,13 @@ (fn [_transaction source] (:type source))) (defmulti update-source! - "Enqueue items onto the `workload`'s source queue to be consumed by a later processing - stage unless stopped, performing any external effects as necessary. - Implementations should avoid maintaining in-memory state and making long- - running external calls, favouring internal queues to manage such tasks - asynchronously between invocations. This function is called one or more - times after `start-source!` and may be called after `stop-source!`" + "Enqueue items onto the `workload`'s source queue to be consumed by a + later processing stage unless stopped, performing any external + effects as necessary. Implementations should avoid maintaining + in-memory state and making long- running external calls, favouring + internal queues to manage such tasks asynchronously between + invocations. This function is called one or more times after + `start-source!` and may be called after `stop-source!`" (fn [{:keys [source] :as _workload}] (:type source))) ;; source load/save operations diff --git a/api/test/wfl/integration/modules/aou_test.clj b/api/test/wfl/integration/modules/aou_test.clj index d66e047d5..9065cf1c3 100644 --- a/api/test/wfl/integration/modules/aou_test.clj +++ b/api/test/wfl/integration/modules/aou_test.clj @@ -1,16 +1,16 @@ (ns wfl.integration.modules.aou-test - (:require [clojure.spec.alpha :as s] - [clojure.test :refer [testing is deftest use-fixtures]] + (:require [clojure.test :refer [testing is deftest]] + [clojure.spec.alpha :as s] [wfl.integration.modules.shared :as shared] [wfl.jdbc :as jdbc] [wfl.module.aou :as aou] + [wfl.module.batch :as batch] [wfl.tools.fixtures :as fixtures] [wfl.tools.workloads :as workloads] - [wfl.util :as util] - [wfl.module.batch :as batch]) + [wfl.util :as util]) (:import [java.util UUID])) -(use-fixtures :once fixtures/temporary-postgresql-database) +(clojure.test/use-fixtures :once fixtures/temporary-postgresql-database) (defn mock-submit-workload [& _] (UUID/randomUUID)) (defn mock-update-statuses! [tx {:keys [items] :as workload}] @@ -25,44 +25,55 @@ (defn ^:private inc-version [sample] (update sample :analysis_version_number inc)) -(def count=1? (comp (partial == 1) count)) - (deftest test-append-to-aou - (with-redefs-fn {#'aou/submit-aou-workflow mock-submit-workload} - #(let [workload (workloads/execute-workload! (make-aou-workload-request)) - append-to-workload! (fn [xs] (workloads/append-to-workload! xs workload))] - (testing "appending a sample to the workload" - (let [response (append-to-workload! [workloads/aou-sample])] - (is (s/valid? ::aou/append-to-aou-response response)) - (is (count=1? response)))) - (testing "appending the same sample to the workload does nothing" - (is (= () (append-to-workload! [workloads/aou-sample])))) - (testing "incrementing analysis_version_number starts a new workload" - (is (count=1? (append-to-workload! [(inc-version workloads/aou-sample)])))) - (testing "only one workflow is started when there are multiple duplicates" - (is (count=1? - (append-to-workload! - (repeat 5 (inc-version (inc-version workloads/aou-sample))))))) - (testing "appending empty workload" - (let [response (append-to-workload! [])] - (is (s/valid? ::aou/append-to-aou-response response)) - (is (empty? response))))))) + (with-redefs [aou/submit-aou-workflow mock-submit-workload] + (let [request (make-aou-workload-request) + workload (workloads/execute-workload! request) + append! (fn [xs] (workloads/append-to-workload! xs workload))] + (testing "appending a sample to the workload" + (let [response (append! [workloads/aou-sample])] + (is (s/valid? ::aou/append-to-aou-response response)) + (is (== 1 (count response))))) + (testing "appending the same sample to the workload does nothing" + (is (= () (append! [workloads/aou-sample])))) + (testing "incrementing analysis_version_number starts a new workload" + (is (== 1 (count (append! [(inc-version workloads/aou-sample)]))))) + (testing "only one workflow is started when there are multiple duplicates" + (is (== 1 (count + (append! + (repeat + 5 (inc-version (inc-version workloads/aou-sample)))))))) + (testing "appending empty workload" + (let [response (append! [])] + (is (s/valid? ::aou/append-to-aou-response response)) + (is (empty? response)))) + (testing "/exec does not return a stopped workload" + (let [matched (workloads/execute-workload! request) + stopped (workloads/stop-workload! matched) + another (workloads/execute-workload! request)] + (is (= (:uuid workload) (:uuid matched) (:uuid stopped))) + (is (:stopped stopped)) + (is (:started another)) + (is (not (:stopped another))) + (is (not= (:id stopped) (:id another))) + (is (not= (:items stopped) (:items another))) + (is (not= (:uuid stopped) (:uuid another)))))))) (deftest test-append-to-aou-not-started - (with-redefs-fn {#'aou/submit-aou-workflow mock-submit-workload} - #(let [workload (workloads/create-workload! (make-aou-workload-request))] - (is (thrown? Exception (workloads/append-to-workload! - [workloads/aou-sample] - workload)))))) + (with-redefs [aou/submit-aou-workflow mock-submit-workload] + (let [workload (workloads/create-workload! (make-aou-workload-request))] + (is (thrown? Exception (workloads/append-to-workload! + [workloads/aou-sample] + workload)))))) (deftest test-append-to-stopped-aou-workload - (with-redefs-fn {#'aou/submit-aou-workflow mock-submit-workload} - #(as-> (workloads/create-workload! (make-aou-workload-request)) workload - (workloads/start-workload! workload) - (workloads/stop-workload! workload) - (is (thrown? Exception (workloads/append-to-workload! - [workloads/aou-sample] - workload)))))) + (with-redefs [aou/submit-aou-workflow mock-submit-workload] + (as-> (workloads/create-workload! (make-aou-workload-request)) workload + (workloads/start-workload! workload) + (workloads/stop-workload! workload) + (is (thrown? Exception (workloads/append-to-workload! + [workloads/aou-sample] + workload)))))) (deftest test-workload-state-transition (shared/run-workload-state-transition-test! (make-aou-workload-request))) @@ -74,18 +85,18 @@ (shared/run-retry-is-not-supported-test! (make-aou-workload-request))) (deftest test-aou-workload-not-finished-until-stopped - (with-redefs-fn {#'aou/submit-aou-workflow mock-submit-workload - #'batch/update-workflow-statuses! mock-update-statuses!} - #(let [workload (-> (make-aou-workload-request) - (workloads/execute-workload!) - (workloads/update-workload!))] - (is (not (:finished workload))) - (workloads/append-to-workload! [workloads/aou-sample] workload) - (let [workload (workloads/load-workload-for-uuid (:uuid workload))] - (is (every? (comp #{"Submitted"} :status) (workloads/workflows workload))) - (let [workload (workloads/update-workload! workload)] - (is (every? (comp #{"Succeeded"} :status) (workloads/workflows workload))) - (is (not (:finished workload)))))))) + (with-redefs [aou/submit-aou-workflow mock-submit-workload + batch/update-workflow-statuses! mock-update-statuses!] + (let [workload (-> (make-aou-workload-request) + (workloads/execute-workload!) + (workloads/update-workload!))] + (is (not (:finished workload))) + (workloads/append-to-workload! [workloads/aou-sample] workload) + (let [workload (workloads/load-workload-for-uuid (:uuid workload))] + (is (every? (comp #{"Submitted"} :status) (workloads/workflows workload))) + (let [workload (workloads/update-workload! workload)] + (is (every? (comp #{"Succeeded"} :status) (workloads/workflows workload))) + (is (not (:finished workload)))))))) ;; rr: GH-1071 (deftest test-exec-on-same-workload-request @@ -95,7 +106,7 @@ (workloads/execute-workload! request)))))) (deftest test-exec-on-similar-workload-request - (testing "output bucket slashes should be standardized to not create new workloads unnecessarily" + (testing "standardize output to not create new workloads unnecessarily" (let [request (make-aou-workload-request) slashified (update request :output util/slashify) deslashified (update request :output util/de-slashify)] diff --git a/api/test/wfl/integration/modules/sg_test.clj b/api/test/wfl/integration/modules/sg_test.clj index 30819f2cf..3b19eeb15 100644 --- a/api/test/wfl/integration/modules/sg_test.clj +++ b/api/test/wfl/integration/modules/sg_test.clj @@ -85,12 +85,12 @@ (is (:uuid workflow)) (is (:status workflow)) (is (:updated workflow)))] - (with-redefs-fn {#'batch/submit-workload! mock-submit-workload} - #(let [workload (-> (the-sg-workload-request) - workloads/create-workload! - workloads/start-workload!)] - (is (:started workload)) - (run! go! (workloads/workflows workload)))))) + (with-redefs [batch/submit-workload! mock-submit-workload] + (let [workload (-> (the-sg-workload-request) + workloads/create-workload! + workloads/start-workload!)] + (is (:started workload)) + (run! go! (workloads/workflows workload)))))) (deftest test-hidden-inputs (testing "google_account_vault_path and vault_token_path are not in inputs" @@ -117,12 +117,12 @@ (is (:supports_inputs in)) (UUID/randomUUID))) (verify-submitted-inputs [_ _ inputs _ _] (map submit inputs))] - (with-redefs-fn {#'cromwell/submit-workflows verify-submitted-inputs} - #(-> (the-sg-workload-request) - (assoc-in [:common :inputs] {:overwritten false - :supports_common_inputs true}) - (update :items (partial map overmap)) - workloads/execute-workload!))))) + (with-redefs [cromwell/submit-workflows verify-submitted-inputs] + (-> (the-sg-workload-request) + (assoc-in [:common :inputs] {:overwritten false + :supports_common_inputs true}) + (update :items (partial map overmap)) + workloads/execute-workload!))))) (deftest test-workflow-options (let [{:keys [output] :as request} (the-sg-workload-request)] @@ -138,14 +138,14 @@ (let [defaults (sg/make-workflow-options url output)] (is (= defaults (select-keys options (keys defaults)))) (map (fn [_] (UUID/randomUUID)) inputs)))] - (with-redefs-fn {#'cromwell/submit-workflows verify-submitted-options} - #(-> request - (assoc-in [:common :options] {:overwritten false - :supports_common_options true}) - (update :items (partial map overmap)) - workloads/execute-workload! - workloads/workflows - (->> (map (comp verify-workflow-options :options)))))))) + (with-redefs [cromwell/submit-workflows verify-submitted-options] + (-> request + (assoc-in [:common :options] {:overwritten false + :supports_common_options true}) + (update :items (partial map overmap)) + workloads/execute-workload! + workloads/workflows + (->> (map (comp verify-workflow-options :options)))))))) (deftest test-empty-workflow-options (letfn [(go! [workflow] (is (absent? workflow :options))) @@ -167,7 +167,7 @@ (is md) (letfn [(ok? [v] (or (integer? v) (and (string? v) (seq v))))] (is (every? ok? ((apply juxt clio/add-keys) md)))) - "-MRu7X3zEzoGeFAVSF-J") + "-Is-A-Clio-Record-Id") (defn ^:private mock-clio-failed "Fail when `_clio` called with metadata `_md`." @@ -307,6 +307,7 @@ :else false))))) (defn ^:private test-clio-updates + "Assert that Clio is updated correctly." [] (let [{:keys [items] :as request} (the-sg-workload-request)] (-> request @@ -319,47 +320,88 @@ (is (= (count items) (-> workload workloads/workflows count)))))))) +(defn ^:private mock-add-bam-suggest-force=true + "Throw rejecting the `_md` update to `_clio` and suggesting force=true." + [_clio {:keys [bam_path version] :as _md}] + (if (= 23 version) + (let [adding (str/join \space ["Adding this document will overwrite" + "the following existing metadata:"]) + field "Field: Chain(Left(bam_path))," + oldval "Old value: \"gs://bam/oldval.bam\"," + newval (format "New value: \"%s\"." bam_path) + force "Use 'force=true' to overwrite the existing data." + message (str/join \space [field oldval newval force]) + body (str/join \newline [adding message])] + (throw (ex-info "clj-http: status 400" {:body body + :reason-phrase "Bad Request" + :status 400}))) + (is (= 24 version) "-Is-A-Clio-Record-Id"))) + +(defn ^:private mock-add-bam-throw-something-else + "Throw on the `_md` update to `_clio` without suggesting force=true." + [_clio _md] + (throw (ex-info "clj-http: status 500" {:body "You goofed!" + :reason-phrase "Blame tbl." + :status 500}))) + +(deftest test-handle-add-bam-force=true + (testing "Retry add-bam when Clio suggests force=true." + (with-redefs [clio/add-bam mock-add-bam-suggest-force=true + clio/query-bam mock-clio-query-bam-missing + clio/query-cram mock-clio-query-cram-found + cromwell/metadata mock-cromwell-metadata-succeeded + cromwell/query mock-cromwell-query-succeeded + cromwell/submit-workflows mock-cromwell-submit-workflows + gcs/upload-content mock-gcs-upload-content] + (test-clio-updates))) + (testing "Do not retry when Clio rejects add-bam for another reason." + (with-redefs [clio/add-bam mock-add-bam-throw-something-else + clio/query-bam mock-clio-query-bam-missing + clio/query-cram mock-clio-query-cram-found + cromwell/metadata mock-cromwell-metadata-succeeded + cromwell/query mock-cromwell-query-succeeded + cromwell/submit-workflows mock-cromwell-submit-workflows + gcs/upload-content mock-gcs-upload-content] + (is (thrown-with-msg? Exception #"clj-http: status 500" (test-clio-updates)))))) + (deftest test-clio-updates-bam-found (testing "Clio not updated if outputs already known." - (with-redefs-fn - {#'clio/add-bam mock-clio-add-bam-found - #'clio/query-bam mock-clio-query-bam-found - #'clio/query-cram mock-clio-query-cram-found - #'cromwell/metadata mock-cromwell-metadata-succeeded - #'cromwell/query mock-cromwell-query-succeeded - #'cromwell/submit-workflows mock-cromwell-submit-workflows - #'gcs/upload-content mock-gcs-upload-content-fail} - test-clio-updates))) + (with-redefs [clio/add-bam mock-clio-add-bam-found + clio/query-bam mock-clio-query-bam-found + clio/query-cram mock-clio-query-cram-found + cromwell/metadata mock-cromwell-metadata-succeeded + cromwell/query mock-cromwell-query-succeeded + cromwell/submit-workflows mock-cromwell-submit-workflows + gcs/upload-content mock-gcs-upload-content-fail] + (test-clio-updates)))) (deftest test-clio-updates-bam-missing (testing "Clio updated after workflows finish." - (with-redefs-fn - {#'clio/add-bam mock-clio-add-bam-missing - #'clio/query-bam mock-clio-query-bam-missing - #'clio/query-cram mock-clio-query-cram-found - #'cromwell/metadata mock-cromwell-metadata-succeeded - #'cromwell/query mock-cromwell-query-succeeded - #'cromwell/submit-workflows mock-cromwell-submit-workflows - #'gcs/upload-content mock-gcs-upload-content} - test-clio-updates))) + (with-redefs [clio/add-bam mock-clio-add-bam-missing + clio/query-bam mock-clio-query-bam-missing + clio/query-cram mock-clio-query-cram-found + cromwell/metadata mock-cromwell-metadata-succeeded + cromwell/query mock-cromwell-query-succeeded + cromwell/submit-workflows mock-cromwell-submit-workflows + gcs/upload-content mock-gcs-upload-content] + (test-clio-updates)))) (deftest test-clio-updates-cromwell-failed (testing "Clio not updated after workflows fail." - (with-redefs-fn - {#'clio/add-bam mock-clio-failed - #'clio/query-bam mock-clio-failed - #'clio/query-cram mock-clio-failed - #'cromwell/metadata mock-cromwell-metadata-failed - #'cromwell/query mock-cromwell-query-failed - #'cromwell/submit-workflows mock-cromwell-submit-workflows - #'gcs/upload-content mock-gcs-upload-content-fail} - test-clio-updates))) + (with-redefs [clio/add-bam mock-clio-failed + clio/query-bam mock-clio-failed + clio/query-cram mock-clio-failed + cromwell/metadata mock-cromwell-metadata-failed + cromwell/query mock-cromwell-query-failed + cromwell/submit-workflows mock-cromwell-submit-workflows + gcs/upload-content mock-gcs-upload-content-fail] + (test-clio-updates)))) (defn workflow-postcheck [output {:keys [uuid] :as _workflow}] (let [md (cromwell/metadata @workloads/cromwell-url uuid) bam (get-in md [:outputs :GDCWholeGenomeSomaticSingleSample.bam]) - bam_path (sg/final_workflow_outputs_dir_hack output bam)] + bam_path (#'sg/final_workflow_outputs_dir_hack output bam)] (is (seq (clio/query-bam @workloads/clio-url {:bam_path bam_path}))))) (defmethod workloads/postcheck sg/pipeline postcheck-sg-workload @@ -375,20 +417,22 @@ (run! update! (wfl.api.workloads/workflows tx workload)))) (deftest test-workload-state-transition - (let [count (atom 0) - increment-count (fn [& _] (swap! count inc))] - (with-redefs-fn - {#'cromwell/submit-workflows mock-cromwell-submit-workflows - #'batch/batch-update-workflow-statuses! (partial mock-batch-update-workflow-statuses! "Succeeded") - #'sg/register-workload-in-clio increment-count} - #(shared/run-workload-state-transition-test! (the-sg-workload-request))) + (let [count (atom 0) + increment (fn [& _] (swap! count inc)) + succeed (partial mock-batch-update-workflow-statuses! "Succeeded")] + (with-redefs + [cromwell/submit-workflows mock-cromwell-submit-workflows + batch/batch-update-workflow-statuses! succeed + sg/register-workload-in-clio increment] + (shared/run-workload-state-transition-test! (the-sg-workload-request))) (is (== 1 @count) "Clio was updated more than once"))) (deftest test-stop-workload-state-transition (shared/run-stop-workload-state-transition-test! (the-sg-workload-request))) (deftest test-retry-workflows-supported - (with-redefs-fn - {#'cromwell/submit-workflows mock-cromwell-submit-workflows - #'batch/batch-update-workflow-statuses! (partial mock-batch-update-workflow-statuses! "Failed")} - #(shared/run-workload-state-transition-test! (the-sg-workload-request)))) + (let [fail (partial mock-batch-update-workflow-statuses! "Failed")] + (with-redefs + [cromwell/submit-workflows mock-cromwell-submit-workflows + batch/batch-update-workflow-statuses! fail] + (shared/run-workload-state-transition-test! (the-sg-workload-request))))) diff --git a/api/test/wfl/integration/sinks/workspace_sink_test.clj b/api/test/wfl/integration/sinks/workspace_sink_test.clj index c01c4387f..694e66c4d 100644 --- a/api/test/wfl/integration/sinks/workspace_sink_test.clj +++ b/api/test/wfl/integration/sinks/workspace_sink_test.clj @@ -13,56 +13,54 @@ (:import [clojure.lang ExceptionInfo] [wfl.util UserException])) -;; Workspace -(def ^:private testing-namespace "wfl-dev") -(def ^:private testing-workspace (str testing-namespace "/" "CDC_Viral_Sequencing")) +(def ^:private entity-type "flowcell") +(def ^:private entity-name "test") -;; Entity -(def ^:private testing-entity-type "flowcell") -(def ^:private testing-entity-name "test") - -(let [new-env {"WFL_FIRECLOUD_URL" "https://api.firecloud.org" - "WFL_RAWLS_URL" "https://rawls.dsde-prod.broadinstitute.org"}] - (use-fixtures :once - (fixtures/temporary-environment new-env) - fixtures/temporary-postgresql-database)) +(use-fixtures :once + (fixtures/temporary-environment + {"WFL_FIRECLOUD_URL" "https://api.firecloud.org" + "WFL_RAWLS_URL" "https://rawls.dsde-prod.broadinstitute.org"}) + fixtures/temporary-postgresql-database) ;; Validation tests (deftest test-validate-terra-workspace-sink-with-valid-sink-request (is (sink/terra-workspace-sink-validate-request-or-throw {:name @#'sink/terra-workspace-sink-name - :workspace testing-workspace + :workspace "wfl-dev/CDC_Viral_Sequencing" :entityType "assemblies" :identity "Who cares?" :fromOutputs {:assemblies_id "foo"}}))) (deftest test-validate-terra-workspace-sink-throws-on-invalid-sink-entity-type (is (thrown-with-msg? - UserException (re-pattern sink/unknown-entity-type-error-message) + UserException + (re-pattern @#'sink/unknown-entity-type-error-message) (sink/terra-workspace-sink-validate-request-or-throw {:name @#'sink/terra-workspace-sink-name - :workspace testing-workspace + :workspace "wfl-dev/CDC_Viral_Sequencing" :entityType "does_not_exist" :identity "Who cares?" :fromOutputs {}})))) (deftest test-validate-terra-workspace-sink-throws-on-malformed-fromOutputs (is (thrown-with-msg? - UserException (re-pattern sink/terra-workspace-malformed-from-outputs-message) + UserException + (re-pattern @#'sink/terra-workspace-malformed-from-outputs-message) (sink/terra-workspace-sink-validate-request-or-throw {:name @#'sink/terra-workspace-sink-name - :workspace testing-workspace + :workspace "wfl-dev/CDC_Viral_Sequencing" :entityType "assemblies" :identity "Who cares?" :fromOutputs "geoff"})))) (deftest test-validate-terra-workspace-sink-throws-on-unknown-fromOutputs-attributes (is (thrown-with-msg? - UserException (re-pattern sink/unknown-attributes-error-message) + UserException + (re-pattern @#'sink/unknown-attributes-error-message) (sink/terra-workspace-sink-validate-request-or-throw {:name @#'sink/terra-workspace-sink-name - :workspace testing-workspace + :workspace "wfl-dev/CDC_Viral_Sequencing" :entityType "assemblies" :identity "Who cares?" :fromOutputs {:does_not_exist "genbank_source_table"}})))) @@ -84,7 +82,7 @@ (jdbc/with-db-transaction [tx (postgres/wfl-db-config)] (->> {:name "Terra Workspace" :workspace "workspace-ns/workspace-name" - :entityType testing-entity-type + :entityType entity-type :fromOutputs (resources/read-resource "sarscov2_illumina_full/entity-from-outputs.edn") :identifier identifier @@ -95,29 +93,26 @@ (verify-upsert-request [workspace [[type name _attributes] :as _entities]] (is (= "workspace-ns/workspace-name" workspace)) - (is (= testing-entity-type type)) - (is (= testing-entity-name name))) - (throw-if-called [fname & args] + (is (= entity-type type)) + (is (= entity-name name))) + (throw-when [fname & args] (throw (ex-info (str fname " should not have been called") {:called-with args})))] (let [workflow {:uuid "2768b29e-c808-4bd6-a46b-6c94fd2a67aa" :status "Succeeded" :outputs (-> "sarscov2_illumina_full/outputs.edn" resources/read-resource - (assoc :flowcell_id testing-entity-name))} + (assoc :flowcell_id entity-name))} executor (make-queue-from-list [[nil workflow]]) sink-throws (sink "not-a-workflow-input-or-output") sink-updates (sink "flowcell_id") - workload-throws {:executor executor - :sink sink-throws} - workload-updates {:executor executor - :sink sink-updates}] + workload-throws {:executor executor :sink sink-throws} + workload-updates {:executor executor :sink sink-updates} + batch-upsert (partial throw-when "rawls/batch-upsert") + delete-entities (partial throw-when "firecloud/delete-entities")] (testing "Sink identifier matches no workflow output or input" - (with-redefs - [rawls/batch-upsert (partial throw-if-called "rawls/batch-upsert") - sink/entity-exists? (partial throw-if-called "sink/entity-exists?") - firecloud/delete-entities - (partial throw-if-called "firecloud/delete-entities")] + (with-redefs [rawls/batch-upsert batch-upsert + firecloud/delete-entities delete-entities] (is (thrown-with-msg? ExceptionInfo (re-pattern @#'sink/entity-name-not-found-error-message) @@ -129,12 +124,9 @@ (let [records (->> sink-throws :details (postgres/get-table tx))] (is (empty? records) "No sink records should have been written")))) (testing "Sink identifier matches workflow output" - (with-redefs-fn - {#'rawls/batch-upsert verify-upsert-request - #'sink/entity-exists? (constantly false) - #'firecloud/delete-entities - (partial throw-if-called "firecloud/delete-entities")} - #(sink/update-sink! workload-updates)) + (with-redefs [rawls/batch-upsert verify-upsert-request + firecloud/delete-entities delete-entities] + (sink/update-sink! workload-updates)) (is (zero? (stage/queue-length executor)) "The workflow was not consumed") (jdbc/with-db-transaction [tx (postgres/wfl-db-config)] (let [[record & rest] (->> sink-updates @@ -144,7 +136,7 @@ (is (empty? rest) "More than one record was written") (is (= (:uuid workflow) (:workflow record)) "The workflow UUID was not written") - (is (= testing-entity-name (:entity record)) + (is (= entity-name (:entity record)) "The entity was not correct"))))))) (deftest test-sinking-resubmitted-workflow @@ -152,17 +144,17 @@ (fn [workspace] (let [workflow1 {:uuid "2768b29e-c808-4bd6-a46b-6c94fd2a67aa" :status "Succeeded" - :outputs {:run_id testing-entity-name + :outputs {:run_id entity-name :results ["aligned-thing.cram"]}} workflow2 {:uuid "2768b29e-c808-4bd6-a46b-6c94fd2a67ab" :status "Succeeded" - :outputs {:run_id testing-entity-name + :outputs {:run_id entity-name :results ["another-aligned-thing.cram"]}} executor (make-queue-from-list [[nil workflow1] [nil workflow2]]) sink (jdbc/with-db-transaction [tx (postgres/wfl-db-config)] (->> {:name "Terra Workspace" :workspace workspace - :entityType testing-entity-type + :entityType entity-type :fromOutputs {:aligned_crams "results"} :identifier "run_id" :skipValidation true} @@ -175,10 +167,9 @@ (is (== 1 (stage/queue-length executor)) "one workflow should have been consumed") (let [{:keys [entityType name attributes]} - (firecloud/get-entity - workspace [testing-entity-type testing-entity-name])] - (is (= testing-entity-type entityType)) - (is (= testing-entity-name name)) + (firecloud/get-entity workspace [entity-type entity-name])] + (is (= entity-type entityType)) + (is (= entity-name name)) (is (== 1 (count attributes))) (is (= [:aligned_crams {:itemsType "AttributeValue" :items ["aligned-thing.cram"]}] @@ -186,14 +177,13 @@ (sink/update-sink! workload) (is (zero? (stage/queue-length executor)) "one workflow should have been consumed") - (let [entites (firecloud/list-entities workspace testing-entity-type)] + (let [entites (firecloud/list-entities workspace entity-type)] (is (== 1 (count entites)) "No new entities should have been added")) (let [{:keys [entityType name attributes]} - (firecloud/get-entity - workspace [testing-entity-type testing-entity-name])] - (is (= testing-entity-type entityType)) - (is (= testing-entity-name name)) + (firecloud/get-entity workspace [entity-type entity-name])] + (is (= entity-type entityType)) + (is (= entity-name name)) (is (== 1 (count attributes))) (is (= [:aligned_crams {:itemsType "AttributeValue" :items ["another-aligned-thing.cram"]}] diff --git a/api/test/wfl/tools/fixtures.clj b/api/test/wfl/tools/fixtures.clj index 5f0b7ced6..6289025e3 100644 --- a/api/test/wfl/tools/fixtures.clj +++ b/api/test/wfl/tools/fixtures.clj @@ -86,15 +86,6 @@ (finally (reset! @#'postgres/testing-db-overrides prev))))))) -(defn create-local-database-for-testing - "Create and run liquibase on a PostgreSQL database named `dbname`. Assumes - that `dbname` does not already exist. - Notes: - - This is intended for interactive development in a REPL. - - The new database will NOT be cleaned up automatically." - [dbname] - (-> dbname create-local-database setup-local-database)) - (defmacro with-fixtures "Use 0 or more `fixtures` in `use-fixtures`. Parameters diff --git a/api/test/wfl/unit/executor_test.clj b/api/test/wfl/unit/executor_test.clj index 5838e807f..d3b598e6c 100644 --- a/api/test/wfl/unit/executor_test.clj +++ b/api/test/wfl/unit/executor_test.clj @@ -134,12 +134,11 @@ {:status "Failed"} {:status "Succeeded"}] workload {:executor {:workspace "workspaceNs/workspaceName"}}] - (letfn [(mock-workflow-finished-slack-msg - [_executor {:keys [status] :as _record}] + (letfn [(slack-finished [_executor {:keys [status] :as _record}] (is (cromwell/final? status) "Should not notify for non-final workflows"))] - (with-redefs - [executor/workflow-finished-slack-msg mock-workflow-finished-slack-msg - slack/notify-watchers (constantly nil)] - (is (= records (#'executor/notify-on-workflow-completion workload records)) + (with-redefs [executor/workflow-finished-slack-msg slack-finished + slack/notify-watchers (constantly nil)] + (is (= records + (#'executor/notify-on-workflow-completion workload records)) "Should return all passed-in records"))))) diff --git a/api/test/wfl/unit/logging_test.clj b/api/test/wfl/unit/logging_test.clj index 8da9c1b6c..85a7e9d2c 100644 --- a/api/test/wfl/unit/logging_test.clj +++ b/api/test/wfl/unit/logging_test.clj @@ -35,8 +35,7 @@ (deftest severity-level-filtering-test (testing "logging level ignores lesser severities" - (with-redefs - [log/active-level-predicate (atom (:info @#'log/active-map))] + (with-redefs [log/active-level-predicate (atom (:info @#'log/active-map))] (is (str/blank? (with-out-str (log/debug "Debug Message")))) (is (logged? (with-out-str (log/info "Info Message")) :info "Info Message"))))) diff --git a/api/test/wfl/unit/modules/aou_test.clj b/api/test/wfl/unit/modules/aou_test.clj index d80755b2b..6d81a5639 100644 --- a/api/test/wfl/unit/modules/aou_test.clj +++ b/api/test/wfl/unit/modules/aou_test.clj @@ -1,107 +1,99 @@ (ns wfl.unit.modules.aou-test (:require [clojure.test :refer [deftest is testing]] - [clojure.set :as set] [wfl.module.aou :as aou])) -(def ^:private cromwell-url - "https://cromwell-gotc-auth.gotc-dev.broadinstitute.org") +(def ^:private per-sample + "Per-sample input keys for AoU workflows." + [:analysis_version_number + :bead_pool_manifest_file + :call_rate_threshold + :chip_well_barcode + :cluster_file + :environment + :extended_chip_manifest_file + :gender_cluster_file + :green_idat_cloud_path + :minor_allele_frequency_file + :params_file + :red_idat_cloud_path + :reported_gender + :sample_alias + :sample_lsid + :zcall_thresholds_file]) -(deftest test-cromwell->inputs+options - (testing "Map cromwell URL to inputs+options correctly" - (is (= (:environment (aou/cromwell->inputs+options cromwell-url)) "dev")))) +(def ^:private other-keys + "Input keys that are not per-sample." + [:contamination_controls_vcf + :dbSNP_vcf + :dbSNP_vcf_index + :disk_size + :fingerprint_genotypes_vcf_file + :fingerprint_genotypes_vcf_index_file + :haplotype_database_file + :preemptible_tries + :ref_dict + :ref_fasta + :ref_fasta_index + :subsampled_metrics_interval_list + :variant_rsids_file + :vault_token_path]) + +(def ^:private control-keys + "Keys that mark control samples." + [:control_sample_vcf_index_file + :control_sample_intervals_file + :control_sample_vcf_file + :control_sample_name]) + +(def ^:private per-sample-inputs + "Bogus per-sample input for AoU workflows." + (-> per-sample + (zipmap (map name per-sample)) + (assoc :analysis_version_number 23))) (deftest test-make-cromwell-labels - (let [sample {:analysis_version_number 1 - :bead_pool_manifest_file "foo" - :chip_well_barcode "chip" - :cluster_file "foo" - :extended_chip_manifest_file "foo" - :gender_cluster_file "foo" - :green_idat_cloud_path "foo" - :minor_allele_frequency_file "foo" - :params_file "foo" - :red_idat_cloud_path "foo" - :reported_gender "foo" - :sample_alias "foo" - :sample_lsid "foo" - :zcall_thresholds_file "foo"} - workload->label {:workload "bogus-workload"} - expected (merge {:wfl "AllOfUsArrays" - :analysis_version_number 1 - :chip_well_barcode "chip"} - workload->label)] - (testing "make-labels can return correct workflow labels" - (is (= (aou/make-labels sample workload->label) expected) "label map is not made as expected")))) + (testing "make-labels can return correct workflow labels" + (let [labels {:workload "bogus-workload"}] + (is (= (aou/make-labels per-sample-inputs labels) + (-> per-sample-inputs + (select-keys @#'aou/primary-keys) + (merge {:wfl "AllOfUsArrays"} labels))) + "label map is not made as expected")))) + +(defn ^:private arraysify + "The keywords in KWS prefixed with `Arrays.`." + [kws] + (map (fn [k] (keyword (str "Arrays." (name k)))) kws)) (deftest test-aou-inputs-preparation - (let [expected-per-sample-inputs {:analysis_version_number "foo" - :bead_pool_manifest_file "foo" - :call_rate_threshold "foo" - :chip_well_barcode "foo" - :cluster_file "foo" - :extended_chip_manifest_file "foo" - :gender_cluster_file "foo" - :green_idat_cloud_path "foo" - :minor_allele_frequency_file "foo" - :params_file "foo" - :red_idat_cloud_path "foo" - :reported_gender "foo" - :sample_alias "foo" - :sample_lsid "foo" - :zcall_thresholds_file "foo" - :environment "foo"} - redundant-per-sample-inputs-inputs (merge expected-per-sample-inputs {:extra "bar"}) - missing-per-sample-inputs-inputs (dissoc expected-per-sample-inputs :analysis_version_number) - all-expected-keys-no-control #{:Arrays.preemptible_tries - :Arrays.environment - :Arrays.ref_dict - :Arrays.params_file - :Arrays.subsampled_metrics_interval_list - :Arrays.chip_well_barcode - :Arrays.sample_alias - :Arrays.variant_rsids_file - :Arrays.ref_fasta_index - :Arrays.dbSNP_vcf - :Arrays.disk_size - :Arrays.contamination_controls_vcf - :Arrays.green_idat_cloud_path - :Arrays.minor_allele_frequency_file - :Arrays.fingerprint_genotypes_vcf_index_file - :Arrays.vault_token_path - :Arrays.reported_gender - :Arrays.dbSNP_vcf_index - :Arrays.extended_chip_manifest_file - :Arrays.zcall_thresholds_file - :Arrays.sample_lsid - :Arrays.red_idat_cloud_path - :Arrays.gender_cluster_file - :Arrays.ref_fasta - :Arrays.bead_pool_manifest_file - :Arrays.analysis_version_number - :Arrays.fingerprint_genotypes_vcf_file - :Arrays.cluster_file - :Arrays.call_rate_threshold - :Arrays.haplotype_database_file} - all-expected-keys (set/union all-expected-keys-no-control - #{:Arrays.control_sample_vcf_index_file - :Arrays.control_sample_intervals_file - :Arrays.control_sample_vcf_file - :Arrays.control_sample_name})] + (let [cromwell-url "https://cromwell-gotc-auth.gotc-dev.broadinstitute.org" + extra-inputs (merge per-sample-inputs {:extra "extra"}) + inputs-missing (dissoc per-sample-inputs :analysis_version_number) + no-controls (-> other-keys (concat per-sample) arraysify set) + all-keys (-> control-keys arraysify (concat no-controls) set)] (testing "aou filters out non-necessary keys for per-sample-inputs" - (is (= expected-per-sample-inputs (aou/get-per-sample-inputs redundant-per-sample-inputs-inputs)))) + (is (= per-sample-inputs (aou/get-per-sample-inputs extra-inputs)))) (testing "aou throws for missing keys for per-sample-inputs" - (is (thrown? Exception (aou/get-per-sample-inputs missing-per-sample-inputs-inputs)))) + (is (thrown? Exception (aou/get-per-sample-inputs inputs-missing)))) (testing "aou prepares all necessary keys" - (is (= all-expected-keys-no-control (set (keys (aou/make-inputs cromwell-url expected-per-sample-inputs)))))) - (testing "aou supplies merges environment from inputs with default" - (is (= "dev" (:Arrays.environment (aou/make-inputs cromwell-url (dissoc expected-per-sample-inputs :environment))))) - (is (= "foo" (:Arrays.environment (aou/make-inputs cromwell-url expected-per-sample-inputs)))) - (is (= all-expected-keys-no-control - (set (keys (aou/make-inputs cromwell-url (dissoc expected-per-sample-inputs :environment))))))) + (is (= no-controls (-> cromwell-url + (aou/make-inputs per-sample-inputs) + keys set)))) + (testing "aou merges environment from inputs with default" + (let [no-environment (dissoc per-sample-inputs :environment)] + (is (= "dev" (->> no-environment + (aou/make-inputs cromwell-url) + :Arrays.environment))) + (is (= "environment" (->> per-sample-inputs + (aou/make-inputs cromwell-url) + :Arrays.environment))) + (is (= no-controls (->> no-environment + (aou/make-inputs cromwell-url) + keys set))))) (testing "aou prepares all necessary keys plus optional keys" - (is (= all-expected-keys (set (keys (aou/make-inputs cromwell-url - (merge expected-per-sample-inputs - {:control_sample_vcf_index_file "foo" - :control_sample_intervals_file "foo" - :control_sample_vcf_file "foo" - :control_sample_name "foo"}))))))))) + (is (= all-keys (->> control-keys + (map name) + (zipmap control-keys) + (merge per-sample-inputs) + (aou/make-inputs cromwell-url) + keys set)))))) diff --git a/api/test/wfl/unit/modules/staged_test.clj b/api/test/wfl/unit/modules/staged_test.clj index edb956425..53d7d5789 100644 --- a/api/test/wfl/unit/modules/staged_test.clj +++ b/api/test/wfl/unit/modules/staged_test.clj @@ -3,9 +3,9 @@ [clojure.test :refer [deftest is testing]] [wfl.service.datarepo :as datarepo] [wfl.source :as source] - [wfl.tools.workloads :as workloads]) - (:import [java.time OffsetDateTime ZoneId] - [java.lang Math])) + [wfl.tools.workloads :as workloads] + [wfl.util :as util]) + (:import [java.lang Math])) (def ^:private readers-list ["hornet@firecloud.org"]) @@ -25,7 +25,7 @@ :table "flowcell" :snapshotReaders readers-list}} row-ids (take mock-new-rows-size (range)) - now-obj (OffsetDateTime/now (ZoneId/of "UTC")) + now-obj (util/utc-now) shards->snapshot-requests (with-redefs-fn {#'datarepo/create-snapshot-job mock-create-snapshot-job} diff --git a/api/test/wfl/unit/slack_test.clj b/api/test/wfl/unit/slack_test.clj index ae2ccf6a1..b02e305e8 100644 --- a/api/test/wfl/unit/slack_test.clj +++ b/api/test/wfl/unit/slack_test.clj @@ -17,13 +17,13 @@ (is (= message-count (count (seq @notifier))))))) (deftest test-dispatch-does-not-throw - (let [queue (conj (PersistentQueue/EMPTY) - (make-notification 'test-dispatch-does-not-throw))] - (with-redefs - [slack/post-message (constantly {:ok false :error "something_bad"})] + (let [queue (conj (PersistentQueue/EMPTY) + (make-notification 'test-dispatch-does-not-throw)) + post-error (constantly {:ok false :error "something_bad"}) + post-throw #(throw (ex-info "Unexpected throwable" {}))] + (with-redefs [slack/post-message post-error] (is (= (seq queue) (seq (slack/dispatch-notification queue))) "Queue should remain when posting Slack message returns error")) - (with-redefs - [slack/post-message #(throw (ex-info "Unexpected throwable" {}))] + (with-redefs [slack/post-message post-throw] (is (= (seq queue) (seq (slack/dispatch-notification queue))) "Queue should remain when posting Slack message throws")))) diff --git a/version b/version index 41915c799..5a03fb737 100644 --- a/version +++ b/version @@ -1 +1 @@ -0.19.1 +0.20.0