Skip to content
This repository has been archived by the owner on Nov 20, 2024. It is now read-only.

Commit

Permalink
Deploy WFL 0.20.0 to pick up changes from GH-1674 and GH-1681. (#621)
Browse files Browse the repository at this point in the history
* [GH-1674] Do not add new workflows to a stopped AoU workload. (#617)
* [GH-1681] SG/GDC is reprocessing CRAMs and running into Clio update conflicts. (#620
* Update CHANGELOG and bump version to 0.20.0.
  • Loading branch information
tbl3rd authored Sep 1, 2022
1 parent ffd7409 commit a35cd10
Show file tree
Hide file tree
Showing 16 changed files with 459 additions and 411 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
# Release 0.20.0
- [GH-1681] SG/GDC is reprocessing CRAMs and running into Clio update conflicts. ([#620](https://github.com/broadinstitute/wfl/pull/620))
- [GH-1674] wfl.module.aou/add-aou-workload! should not add to a stopped workload. ([#617](https://github.com/broadinstitute/wfl/pull/617))

# Release 0.19.1
- [GH-1686] Deploy WFL to point at ExternalExomeReprocessing_v3.1.7 and pick up picard-cloud:2.26.10. ([#618](https://github.com/broadinstitute/wfl/pull/618))
- [GH-1667] Don't delete a Firecloud entity before a Rawls batchUpsert. ([#616](https://github.com/broadinstitute/wfl/pull/616))
Expand Down
189 changes: 94 additions & 95 deletions api/src/wfl/module/aou.clj
Original file line number Diff line number Diff line change
@@ -1,22 +1,23 @@
(ns wfl.module.aou
"Process Arrays for the All Of Us project."
(:require [clojure.spec.alpha :as s]
[clojure.string :as str]
[wfl.api.workloads :as workloads :refer [defoverload]]
[wfl.jdbc :as jdbc]
[wfl.log :as log]
[wfl.module.all :as all]
[wfl.module.batch :as batch]
[wfl.references :as references]
[wfl.service.cromwell :as cromwell]
[wfl.service.google.storage :as gcs]
[wfl.service.postgres :as postgres]
[wfl.util :as util]
[wfl.wfl :as wfl])
(:require [clojure.spec.alpha :as s]
[clojure.string :as str]
[wfl.api.workloads :as workloads :refer [defoverload]]
[wfl.jdbc :as jdbc]
[wfl.log :as log]
[wfl.module.all :as all]
[wfl.module.batch :as batch]
[wfl.references :as references]
[wfl.service.cromwell :as cromwell]
[wfl.service.postgres :as postgres]
[wfl.util :as util]
[wfl.wfl :as wfl])
(:import [java.sql Timestamp]
[java.time OffsetDateTime]
[java.time Instant]
[java.util UUID]))

;; This must agree with the AoU cloud function.
;;
(def pipeline "AllOfUsArrays")

;; specs
Expand Down Expand Up @@ -70,8 +71,7 @@
{:environment "prod"
:vault_token_path "gs://broad-dsp-gotc-arrays-prod-tokens/arrayswdl.token"}])

;; visible for testing
(defn cromwell->inputs+options
(defn ^:private cromwell->inputs+options
"Map cromwell URL to workflow inputs and options for submitting an AllOfUs Arrays workflow.
The returned environment string here is just a default, input file may specify override."
[url]
Expand Down Expand Up @@ -135,23 +135,26 @@
(update :environment str/lower-case)
(util/prefix-keys :Arrays.)))

;; visible for testing
(def default-options
(def ^:private default-options
{:use_relative_output_paths true
:read_from_cache true
:write_to_cache true
:default_runtime_attributes {:zones "us-central1-a us-central1-b us-central1-c us-central1-f"
:maxRetries 1}})
:default_runtime_attributes
{:zones "us-central1-a us-central1-b us-central1-c us-central1-f"
:maxRetries 1}})

(def ^:private primary-keys
"These uniquely identify a sample so use them as a primary key."
[:chip_well_barcode :analysis_version_number])

(defn make-labels
"Return labels for aou arrays pipeline from PER-SAMPLE-INPUTS and OTHER-LABELS."
[per-sample-inputs other-labels]
(merge cromwell-label-map
(select-keys per-sample-inputs [:analysis_version_number :chip_well_barcode])
(select-keys per-sample-inputs primary-keys)
other-labels))

;; visible for testing
(defn submit-aou-workflow
(defn ^:private submit-aou-workflow
"Submit one workflow to Cromwell URL given PER-SAMPLE-INPUTS,
WORKFLOW-OPTIONS and OTHER-LABELS."
[url per-sample-inputs workflow-options other-labels]
Expand All @@ -162,80 +165,72 @@
workflow-options
(make-labels per-sample-inputs other-labels)))

;; This needs the id generated by the jdbc/insert! to name a new table,
;; so update the workload table after creating the AllOfUsArrays table.
;; Update the workload table row with the name of the AllOfUsArrays table.
;; https://www.postgresql.org/docs/current/datatype-numeric.html#DATATYPE-SERIAL
;;
(defn add-aou-workload!
"Use transaction `tx` to record the workflow described in `_request`.
Add the workflow to an old workload matching the `_request` parameters.
Otherwise create a new workload to match those parameters. "
[tx {:keys [creator executor pipeline project output watchers] :as _request}]
(gcs/parse-gs-url output)
(let [slashified-output (util/slashify output)
{:keys [release path]} workflow-wdl
{:keys [commit version]} (wfl/get-the-version)
query-string (str/join \space
["SELECT * FROM workload"
"WHERE project = ?"
"AND pipeline = ?::pipeline"
"AND release = ?"
"AND output = ?"])
workloads (jdbc/query tx [query-string
project pipeline
release slashified-output])
n (count workloads)]
(defn ^:private make-new-aou-workload!
"Use transaction `tx` to record a new `workload` and return its `id`."
[tx workload]
(let [id (->> workload (jdbc/insert! tx :workload) first :id)
table (format "%s_%09d" pipeline id)
table_seq (format "%s_id_seq" table)
kind (format (str/join \space ["UPDATE workload"
"SET pipeline = '%s'::pipeline"
"WHERE id = '%s'"]) pipeline id)
index (format "CREATE SEQUENCE %s AS bigint" table_seq)
work (format (str/join \space ["CREATE TABLE %s OF %s"
"(PRIMARY KEY"
"(analysis_version_number,"
"chip_well_barcode),"
"id WITH OPTIONS NOT NULL"
"DEFAULT nextval('%s'))"])
table pipeline table_seq)
alter (format "ALTER SEQUENCE %s OWNED BY %s.id" table_seq table)]
(jdbc/db-do-commands tx [kind index work alter])
(jdbc/update! tx :workload {:items table} ["id = ?" id])
id))

(defn ^:private add-aou-workload!
"Use transaction `tx` to find a workload matching `request`, or make a
new one, and return the workload's `id`. "
[tx request]
(let [{:keys [creator executor pipeline project output watchers]} request
slashified (util/slashify output)
{:keys [release path]} workflow-wdl
query-string (str/join \space ["SELECT * FROM workload"
"WHERE stopped is null"
"AND project = ?"
"AND pipeline = ?::pipeline"
"AND release = ?"
"AND output = ?"])
workloads (jdbc/query tx [query-string project pipeline
release slashified])
n (count workloads)]
(when (> n 1)
(log/error "Too many workloads" :count n :workloads workloads))
(if-let [workload (first workloads)]
(:id workload)
(let [id (->> {:commit commit
:creator creator
:executor executor
:output slashified-output
:project project
:release release
:uuid (UUID/randomUUID)
:version version
:watchers (pr-str watchers)
:wdl path}
(jdbc/insert! tx :workload)
first :id)
table (format "%s_%09d" pipeline id)
table_seq (format "%s_id_seq" table)
kind (format (str/join \space
["UPDATE workload"
"SET pipeline = '%s'::pipeline"
"WHERE id = '%s'"]) pipeline id)
idx (format "CREATE SEQUENCE %s AS bigint" table_seq)
work (format (str/join \space
["CREATE TABLE %s OF %s"
"(PRIMARY KEY"
"(analysis_version_number,"
"chip_well_barcode),"
"id WITH OPTIONS"
"NOT NULL"
"DEFAULT nextval('%s'))"])
table pipeline table_seq)
link-idx-work (format "ALTER SEQUENCE %s OWNED BY %s.id"
table_seq table)]
(jdbc/db-do-commands tx [kind idx work link-idx-work])
(jdbc/update! tx :workload {:items table} ["id = ?" id])
id))))
(let [{:keys [commit version]} (wfl/get-the-version)]
(make-new-aou-workload! tx {:commit commit
:creator creator
:executor executor
:output slashified
:project project
:release release
:uuid (UUID/randomUUID)
:version version
:watchers (pr-str watchers)
:wdl path})))))

(defn start-aou-workload!
(defn ^:private start-aou-workload!
"Use transaction `tx` to start `workload` so it becomes append-able."
[tx {:keys [id] :as workload}]
(if (:started workload)
workload
(let [now {:started (Timestamp/from (.toInstant (OffsetDateTime/now)))}]
(let [now {:started (Timestamp/from (Instant/now))}]
(jdbc/update! tx :workload now ["id = ?" id])
(merge workload now))))

(def primary-keys
"These uniquely identify a sample so use them as a primary key."
[:chip_well_barcode :analysis_version_number])

(defn ^:private primary-values [sample]
(mapv sample primary-keys))

Expand Down Expand Up @@ -267,29 +262,33 @@
(second (reduce go [known-keys []] samples))))

(defn append-to-workload!
"Use transaction `tx` to append `notifications` (or samples) to `workload`.
Note:
- The `workload` must be `started` in order to be append-able.
- All samples being appended will be submitted immediately."
"Use transaction `tx` to add `notifications` (samples) to `workload`.
Note: - The `workload` must be `started` in order to be append-able.
- All samples being appended will be submitted immediately."
[tx notifications {:keys [uuid items output executor] :as workload}]
(when-not (:started workload)
(throw (Exception. (format "Workload %s is not started" uuid))))
(when (:stopped workload)
(throw (Exception. (format "Workload %s has been stopped" uuid))))
(letfn [(submit! [url sample]
(let [output-path (str output (str/join "/" (primary-values sample)))
workflow-options (util/deep-merge default-options
{:final_workflow_outputs_dir output-path})]
(->> (submit-aou-workflow url sample workflow-options {:workload uuid})
(let [output-path (str output
(str/join "/" (primary-values sample)))
workflow-options (util/deep-merge
default-options
{:final_workflow_outputs_dir output-path})]
(->> {:workload uuid}
(submit-aou-workflow url sample workflow-options)
str ; coerce java.util.UUID -> string
(assoc (select-keys sample primary-keys)
:updated (Timestamp/from (.toInstant (OffsetDateTime/now)))
:updated (Timestamp/from (Instant/now))
:status "Submitted"
:uuid))))]
(let [executor (is-known-cromwell-url? executor)
(let [executor (is-known-cromwell-url? executor)
submitted-samples (map (partial submit! executor)
(remove-existing-samples notifications
(get-existing-samples tx items notifications)))]
(remove-existing-samples
notifications
(get-existing-samples
tx items notifications)))]
(jdbc/insert-multi! tx items submitted-samples)
submitted-samples)))

Expand Down
9 changes: 5 additions & 4 deletions api/src/wfl/module/copyfile.clj
Original file line number Diff line number Diff line change
Expand Up @@ -130,10 +130,11 @@
(util/deep-merge default-options options)
{:workload uuid})])
(update! [tx [id uuid]]
(jdbc/update! tx items
{:updated (OffsetDateTime/now) :uuid uuid :status "Submitted"}
["id = ?" id]))]
(run! (comp (partial update! tx) submit!) (workloads/workflows tx workload))
(jdbc/update! tx items {:status "Submitted"
:updated (OffsetDateTime/now)
:uuid uuid} ["id = ?" id]))]
(run! (comp (partial update! tx) submit!)
(workloads/workflows tx workload))
(jdbc/update! tx :workload
{:started (OffsetDateTime/now)} ["uuid = ?" uuid]))))

Expand Down
41 changes: 34 additions & 7 deletions api/src/wfl/module/sg.clj
Original file line number Diff line number Diff line change
Expand Up @@ -134,8 +134,8 @@
:sample_alias
:version]))))

(defn final_workflow_outputs_dir_hack ; for testing
"Do to `file` what `{:final_workflow_outputs_dir output}` does."
(defn ^:private final_workflow_outputs_dir_hack
"Do to `file` what Cromwell's `{:final_workflow_outputs_dir output}` does."
[output file]
(->> (str/split file #"/")
(drop 3)
Expand All @@ -153,14 +153,41 @@
(log/warn "Need output files for Clio.")
(log/error {:need need}))))

;; This hack depends on how Clio spells error messages.
;;
(defn ^:private hack-try-increment-version-in-clio-add-bam?
"True when `exception` suggests that `clio-add-bam` might succeed
with the version incremented."
[exception]
(let [{:keys [body reason-phrase status]} (ex-data exception)]
(and
(== 400 status)
(= "Bad Request" reason-phrase)
(str/starts-with?
body
"Adding this document will overwrite the following existing metadata:")
(str/ends-with?
body
"Use 'force=true' to overwrite the existing data."))))

(defn ^:private hack-clio-add-bam-with-version-incremented
"Attempt to add `bam` record to `clio` with version `increment`ed."
[clio bam increment]
(when (> increment 2)
(throw (ex-info "Cannot update Clio" {:bam bam :clio clio})))
(try (clio/add-bam clio (update bam :version + increment))
(catch Throwable x
(log/warning {:bam bam :x x})
(if (hack-try-increment-version-in-clio-add-bam? x)
(hack-clio-add-bam-with-version-incremented clio bam (inc increment))
(throw x)))))

(defn ^:private clio-add-bam
"Add `bam` record to `clio`."
"Add `bam` record to `clio`, and maybe retry after incrementing :version."
[clio bam]
(try (clio/add-bam clio bam)
(catch Throwable x
(log/error {:bam bam :x x}))))
(hack-clio-add-bam-with-version-incremented clio bam 0))

(defn maybe-update-clio-and-write-final-files
(defn ^:private maybe-update-clio-and-write-final-files
"Maybe update `clio-url` with `final` and write files and `metadata`."
[clio-url final {:keys [inputs] :as metadata}]
#_(log-missing-final-files-for-debugging final)
Expand Down
18 changes: 4 additions & 14 deletions api/src/wfl/sink.clj
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,7 @@
[wfl.service.rawls :as rawls]
[wfl.stage :as stage]
[wfl.util :as util :refer [utc-now]])
(:import [clojure.lang ExceptionInfo]
[wfl.util UserException]))
(:import [wfl.util UserException]))

(defmulti create-sink!
"Create a `Sink` instance using the database `transaction` and configuration
Expand Down Expand Up @@ -102,14 +101,14 @@
(assoc :type terra-workspace-sink-type))
(throw (ex-info "Invalid sink_items" {:workload workload}))))

(def unknown-entity-type-error-message
(def ^:private unknown-entity-type-error-message
"The entityType was not found in workspace.")

(def terra-workspace-malformed-from-outputs-message
(def ^:private terra-workspace-malformed-from-outputs-message
(str/join " " ["fromOutputs must define a mapping from workflow outputs"
"to the attributes of entityType."]))

(def unknown-attributes-error-message
(def ^:private unknown-attributes-error-message
(str/join " " ["Found additional attributes in fromOutputs that are not"
"present in the entityType."]))

Expand Down Expand Up @@ -169,15 +168,6 @@
{:fromOutputs fromOutputs :workflow workflow}
cause)))))

(defn ^:private entity-exists?
"True when the `entity` exists in the `workspace`."
[workspace entity]
(try
(firecloud/get-entity workspace entity)
(catch ExceptionInfo ex
(when (not= (-> ex ex-data :status) 404)
(throw ex)))))

(def ^:private entity-name-not-found-error-message
(str/join \space ["Entity name not found:"
"sink.identifer not in workflow outputs or inputs"]))
Expand Down
Loading

0 comments on commit a35cd10

Please sign in to comment.