From 0bbe6c93103c727dd5ae579b35a7f42fea87520d Mon Sep 17 00:00:00 2001 From: Erin Weisbart <54687786+ErinWeisbart@users.noreply.github.com> Date: Tue, 5 Nov 2024 13:29:35 -0800 Subject: [PATCH] reformat run_batch_general to CLI, add CPG structure (#179) --- .../DCP-documentation/step_2_submit_jobs.md | 75 +- run_batch_general.py | 836 ++++++++++++++---- 2 files changed, 712 insertions(+), 199 deletions(-) diff --git a/documentation/DCP-documentation/step_2_submit_jobs.md b/documentation/DCP-documentation/step_2_submit_jobs.md index 0f413e8..f26dfc8 100644 --- a/documentation/DCP-documentation/step_2_submit_jobs.md +++ b/documentation/DCP-documentation/step_2_submit_jobs.md @@ -38,14 +38,14 @@ Job files that don't include it will use the default structure. For large numbers of groups, it may be helpful to create this list separately as a .txt file you can then append into the job's JSON file. You may create this yourself in your favorite scripting language. Alternatively, you can use the following additional tools to help you create and format this list: - * `batches.sh` allows you to provide a list of all the individual metadata components (plates, columns, rows, etc). + * `batches.sh` allows you to provide a list of all the individual metadata components (plates, columns, rows, etc). It then uses [GNU parallel](https://www.gnu.org/software/parallel/parallel_tutorial.html) to create a formatted text file with all the possible combinations of the components you provided. This approach is best when you have a large number of groups and the group structure is uniform. Example: for a 96-well plate experiment where one there are 3 plates and the experiment is grouped by Plate and Well, `batches.sh` would read: - `parallel echo '{\"Metadata\": \"Metadata_Plate={1},Metadata_Well={2}{3}\"},' ::: Plate1 Plate2 Plate3 ::: A B C D E F G H ::: 01 02 03 04 05 06 07 08 09 10 11 12 | sort > batches.txt` - * You may also use the list of groupings created by calling `cellprofiler --print-groups` from the command line (see [here](https://github.com/CellProfiler/CellProfiler/wiki/Adapting-CellProfiler-to-a-LIMS-environment#cmd) and [here](https://github.com/CellProfiler/Distributed-CellProfiler/issues/52) for more information). - Note that for job files that specify groupings in this way, the `output_structure` variable is NOT optional - it must be specified or an error will be returned. + `parallel echo '{\"Metadata\": \"Metadata_Plate={1},Metadata_Well={2}{3}\"},' ::: Plate1 Plate2 Plate3 ::: A B C D E F G H ::: 01 02 03 04 05 06 07 08 09 10 11 12 | sort > batches.txt` + * You may also use the list of groupings created by calling `cellprofiler --print-groups` from the command line (see [here](https://github.com/CellProfiler/CellProfiler/wiki/Adapting-CellProfiler-to-a-LIMS-environment#cmd) and [here](https://github.com/CellProfiler/Distributed-CellProfiler/issues/52) for more information). + Note that for job files that specify groupings in this way, the `output_structure` variable is NOT optional - it must be specified or an error will be returned. ## Alternate job submission: run_batch_general.py @@ -53,3 +53,70 @@ We also support an alternate second path besides `submitJobs` to create the list This file essentially serves as a "shortcut" to run many common types of stereotyped experiments we run in our lab. Essentially, if your data follows a regular structure (such as N rows, N columns, N grouping, a particular structure for output, etc.), you may find it useful to take and modify this file for your own usage. We recommend new users use the `submitJobs` pathway, as it will help users understand the kinds of information Distributed-CellProfiler needs in order to run properly, but once they are comfortable with it they may find `run_batch_general.py` helps them create jobs faster in the future. + +As of Distributed-CellProfiler 2.2.0, `run_batch_general.py` has been reformatted as a CLI tool with greatly enhanced customizeability. +`run_batch_general.py` must be passed 5 pieces of information: + +### Required inputs + +* `step` is the step that you would like to make jobs for. +Supported steps are `zproj`, `illum`, `qc`, `qc_persite`, `assaydev`, and`analysis` +* `identifier` is the project identifier (e.g. "cpg0000-jump-pilot" or "2024_11_07_Collaborator_Cell_Painting") +* `batch` is the name of the data batch (e.g. "2020_11_04_CPJUMP1") +* `platelist` is the list of plates to process. +Format the list in quotes with individual plates separated by commas and no spaces (e.g. "Plate1,Plate2,Plate3") + +A minimal `run_batch_general.py` command may look like: +"""bash +run_batch_general.py analysis 2024_05_16_Segmentation_Project 2024_10_10_Batch1 "Plate1,Plate2,Plate3" +""" + +### Required input for Cell Painting Gallery + +Runs being made off of the Cell Painting Gallery require two additional flags: + +* `--source ` to specify the identifier-specific source of the data. +* `--path-style cpg` is to set the input and output paths as data is structured in the Cell Painting Gallery. +All paths can be overwritten with flags (see below). + +A minimal `run_batch_general.py` command for a dataset on the Cell Painting Gallery may look like: +"""bash +run_batch_general.py analysis cpg0000-jump-pilot 2020_11_04_CPJUMP1 "BR00116991,BR00116992" --path-style cpg --source broad +""" + +### Plate layout flags + +* `--plate-format `: if used, can be `96` or `384` and will overwrite `rows` and `columns` to produce standard 96- or 384-well plate well names (e.g. A01, A02, etc.) +* `--rows `: a custom list of row labels. +Will be combined with `columns` to generate well names. +Separate values with commas and no spaces and surround with quotation marks (e.g. `"A,B,C,D,E,F,G"`) +* `--columns `: a custom list of column labels. +Will be combined with `rows` to generate well names. +Separate values with commas and no spaces and surround with quotation marks (e.g. `"1,2,3,4,5,6,7,8,9,10"`) +* `--wells `: a custom list of wells. +Overwrites `rows` and `columns`. +Separate values with commas and no spaces and surround with quotation marks (e.g. `"C02,D04,E04,N12"`) +* `--no-well-digit-pad`: Formats wells without well digit padding. +Formats wells passed with `--plate format` or `--rows` and `--columns` but not `--wells`. +(e.g. `A1` NOT `A01`) +* `--sites `: a custom list of sites (fields of view) to be analyzed. +Separate values with commas and no spaces and surround with quotation marks (e.g. `"1,2,3,4,5,6"`) + +### Overwrite structural defaults + +* `--output-structure `: overwrite default output structure +* `--output-path `: overwrite default output path +* `--input-path `: overwrite the default path to input files + +### Overwrite defaults (for runs using load data .csv's and .cppipe) + +* `--pipeline `: overwrite the default pipeline name +* `--pipeline-path `: overwrite the default path to pipelines +* `--datafile-name `: overwrite the default load data .csv name +* `--datafile-path `: overwrite the default path to load data files + +### Overwrite defaults (for runs using .h5 batch files) + +* `--use-batch`: use h5 batch files instead of load data csv and .cppipe files +* `--batchfile-name `: overwrite default batchfile name +* `--batchfile-path `: overwrite default path to the batchfile diff --git a/run_batch_general.py b/run_batch_general.py index eb869fc..600427b 100644 --- a/run_batch_general.py +++ b/run_batch_general.py @@ -1,213 +1,659 @@ import json import boto3 +import botocore import string -import os import posixpath +import argparse + + class JobQueue(): - def __init__(self,name=None): - self.sqs = boto3.resource('sqs') - self.queue = self.sqs.get_queue_by_name(QueueName=name+'Queue') + def __init__(self, name=None): + self.sqs = boto3.resource("sqs") + try: + self.queue = self.sqs.get_queue_by_name(QueueName=name + "Queue") + except botocore.exceptions.ClientError as error: + if 'NonExistentQueue' in error.response['Error']['Code']: + print (f"Queue {name}Queue does not exist.") + exit() self.inProcess = -1 self.pending = -1 def scheduleBatch(self, data): msg = json.dumps(data) response = self.queue.send_message(MessageBody=msg) - print('Batch sent. Message ID:',response.get('MessageId')) - -#project specific stuff -topdirname='' #Project name (should match the folder structure on S3) -appname='' #Must match config.py (except for step-specific part) -batchsuffix='' #Batch name (should match the folder structure on S3) -rows=list(string.ascii_uppercase)[0:16] -columns=range(1,25) -sites=range(1,10) -well_digit_pad = True #Set True to A01 well format name, set False to A1 -platelist=[] -zprojpipename='Zproj.cppipe' -illumpipename='illum.cppipe' -qcpipename='qc.cppipe' -assaydevpipename='assaydev.cppipe' -analysispipename='analysis.cppipe' -batchpipenamezproj='Batch_data_zproj.h5' -batchpipenameillum='Batch_data_illum.h5' -batchpipenameqc='Batch_data_qc.h5' -batchpipenameassaydev='Batch_data_assaydev.h5' -batchpipenameanalysis='Batch_data_analysis.h5' - -#not project specific, unless you deviate from the structure -startpath=posixpath.join('projects',topdirname) -pipelinepath=posixpath.join(startpath,os.path.join('workspace/pipelines',batchsuffix)) -zprojoutpath=posixpath.join(startpath,os.path.join(batchsuffix,'images')) -zprojoutputstructure="Metadata_Plate/Images" -illumoutpath=posixpath.join(startpath,os.path.join(batchsuffix,'illum')) -QCoutpath=posixpath.join(startpath,os.path.join('workspace/qc',os.path.join(batchsuffix,'results'))) -assaydevoutpath=posixpath.join(startpath,os.path.join('workspace/assaydev',batchsuffix)) -analysisoutpath=posixpath.join(startpath,os.path.join('workspace/analysis',batchsuffix)) -inputpath=posixpath.join(startpath,os.path.join('workspace/qc',os.path.join(batchsuffix,'rules'))) -datafilepath=posixpath.join(startpath,os.path.join('workspace/load_data_csv',batchsuffix)) -anlysisoutputstructure="Metadata_Plate/analysis/Metadata_Plate-Metadata_Well-Metadata_Site" -batchpath=posixpath.join(startpath,os.path.join('workspace/batchfiles',batchsuffix)) -csvname = 'load_data.csv' -csv_with_illumname = 'load_data_with_illum.csv' -csv_unprojected_name = 'load_data_unprojected.csv' -#well formatting -if well_digit_pad: - well_format = '%02d' -else: - well_format = '%01d' - -def MakeZprojJobs(batch=False): - zprojqueue = JobQueue(appname+'_Zproj') - for tozproj in platelist: - for eachrow in rows: - for eachcol in columns: - for eachsite in sites: - if not batch: - templateMessage_zproj = {'Metadata': 'Metadata_Plate='+tozproj+',Metadata_Well='+eachrow+well_format %eachcol+',Metadata_Site='+str(eachsite), - 'pipeline': posixpath.join(pipelinepath,zprojpipename), - 'output': zprojoutpath, - 'output_structure': zprojoutputstructure, - 'input': inputpath, - 'data_file': posixpath.join(datafilepath,tozproj,csv_unprojected_name) - } - else: - templateMessage_zproj = {'Metadata': 'Metadata_Plate='+tozproj+',Metadata_Well='+eachrow+well_format %eachcol+',Metadata_Site='+str(eachsite), - 'pipeline': posixpath.join(batchpath,batchpipenamezproj), - 'output': zprojoutpath, - 'output_structure': zprojoutputstructure, - 'input': inputpath, - 'data_file': posixpath.join(batchpath,batchpipenamezproj) - } - - zprojqueue.scheduleBatch(templateMessage_zproj) - - print('Z projection job submitted. Check your queue') - -def MakeIllumJobs(batch=False): - illumqueue = JobQueue(appname+'_Illum') - for toillum in platelist: - if not batch: - templateMessage_illum = {'Metadata': 'Metadata_Plate='+toillum, - 'pipeline': posixpath.join(pipelinepath,illumpipename), - 'output': illumoutpath, - 'input': inputpath, - 'data_file':posixpath.join(datafilepath,toillum,csvname)} + print("Batch sent. Message ID:", response.get("MessageId")) + + +def run_batch_general( + step, # (zproj, illum, qc, qc_persite, assaydev, or analysis) + identifier="", # (e.g. cpg0000-jump-pilot) + batch="", # (e.g. 2020_11_04_CPJUMP1) + platelist=[], # (e.g. ['Plate1','Plate2']) + path_style="default", # ("cpg" or "default") + source="", # (e.g. source_4, broad. Only with path_style=="cpg") + plate_format="", # (96 or 384. Overwrites rows and columns if passed. Not used by illum.) + rows=list(string.ascii_uppercase)[0:16], # (Not used by illum.) + columns=range(1, 25), # (Not used by illum.) + wells="", # (explicitly list wells. Overwrites rows and columns if passed. Not used by illum. e.g. ['B3','C7']) + sites=range(1, 10), # (Not used by illum, qc, or assaydev.) + well_digit_pad=True, # Set True to A01 well format name, set False to A1 + pipeline="", # (overwrite default pipeline names) + pipelinepath="", # (overwrite default path to pipelines) + inputpath="", # (overwrite default path to input files) + outputstructure="", # (overwrite default output structures) + outpath="", # (overwrite default output paths) + csvname="", # (overwrite default load data csv name) + datafilepath="", # (overwrite default path to load data files) + usebatch=False, # (use h5 batch files instead of load data csv and cppipe files) + batchfile="", # (overwrite default batchfile name) + batchpath="", # (overwrite default path to batch files) +): + + # Two default file organization structures: cpg (for Cell Painting Gallery) and default + path_dict = { + "cpg": { + "pipelinepath": posixpath.join( + identifier, source, "workspace", "pipelines", batch + ), + "zprojoutpath": posixpath.join( + identifier, source, "images", batch, "images_projected" + ), + "zprojoutputstructure": "Metadata_Plate", + "illumoutpath": posixpath.join( + identifier, source, "images", batch, "illum" + ), + "QCoutpath": posixpath.join( + identifier, source, "workspace", "qc", batch, "results" + ), + "assaydevoutpath": posixpath.join( + identifier, source, "workspace", "assaydev", batch + ), + "analysisoutpath": posixpath.join( + identifier, source, "workspace", "analysis", batch + ), + "inputpath": posixpath.join( + identifier, source, "workspace", "qc", batch, "rules" + ), + "datafilepath": posixpath.join( + identifier, source, "workspace", "load_data_csv", batch + ), + "batchpath": "", + }, + "default": { + "pipelinepath": posixpath.join( + "projects", identifier, "workspace", "pipelines", batch + ), + "zprojoutpath": posixpath.join("projects", identifier, batch, "images"), + "zprojoutputstructure": "Metadata_Plate", + "illumoutpath": posixpath.join("projects", identifier, batch, "illum"), + "QCoutpath": posixpath.join( + "projects", identifier, "workspace", "qc", batch, "results" + ), + "assaydevoutpath": posixpath.join( + "projects", identifier, "workspace", "assaydev", batch + ), + "analysisoutpath": posixpath.join( + "projects", identifier, "workspace", "analysis", batch + ), + "inputpath": posixpath.join( + "projects", identifier, "workspace", "qc", batch, "rules" + ), + "datafilepath": posixpath.join( + "projects", identifier, "workspace", "load_data_csv", batch + ), + "batchpath": posixpath.join( + "projects", identifier, "workspace", "batchfiles", batch + ), + }, + } + if not pipelinepath: + pipelinepath = path_dict[path_style]["pipelinepath"] + if not batchpath: + batchpath = path_dict[path_style]["batchpath"] + if not inputpath: + inputpath = path_dict[path_style]["inputpath"] + if not datafilepath: + datafilepath = path_dict[path_style]["datafilepath"] + + # Plate formatting + if plate_format: + if int(plate_format) == 384: + rows = list(string.ascii_uppercase)[0:16] + columns = range(1, 25) + elif int(plate_format) == 96: + rows = list(string.ascii_uppercase)[0:8] + columns = range(1, 13) + else: + print(f"Unsupported plate format of {plate_format}.") + if well_digit_pad: + well_format = "02d" + else: + well_format = "01d" + + if step == "zproj": + zprojqueue = JobQueue(f"{identifier}_Zproj") + if not outputstructure: + outputstructure = path_dict[path_style]["zprojoutputstructure"] + if not outpath: + outpath = path_dict[path_style]["zprojoutpath"] + if not usebatch: + if not pipeline: + pipeline = "Zproj.cppipe" + if not csvname: + csvname = "load_data_unprojected.csv" + + for plate in platelist: + if all(len(ele) == 0 for ele in wells): + for eachrow in rows: + for eachcol in columns: + for eachsite in sites: + templateMessage_zproj = { + "Metadata": f"Metadata_Plate={plate},Metadata_Well={eachrow}{int(eachcol):{well_format}},Metadata_Site={str(eachsite)}", + "pipeline": posixpath.join(pipelinepath, pipeline), + "output": outpath, + "output_structure": outputstructure, + "input": inputpath, + "data_file": posixpath.join( + datafilepath, plate, csvname + ), + } + zprojqueue.scheduleBatch(templateMessage_zproj) + else: + for eachwell in wells: + for eachsite in sites: + templateMessage_zproj = { + "Metadata": f"Metadata_Plate={plate},Metadata_Well={eachwell},Metadata_Site={str(eachsite)}", + "pipeline": posixpath.join(pipelinepath, pipeline), + "output": outpath, + "output_structure": outputstructure, + "input": inputpath, + "data_file": posixpath.join( + datafilepath, plate, csvname + ), + } + zprojqueue.scheduleBatch(templateMessage_zproj) + else: + if not batchfile: + batchfile = "Batch_data_zproj.h5" + for plate in platelist: + if all(len(ele) == 0 for ele in wells): + for eachrow in rows: + for eachcol in columns: + for eachsite in sites: + templateMessage_zproj = { + "Metadata": f"Metadata_Plate={plate},Metadata_Well={eachrow}{int(eachcol):{well_format}},Metadata_Site={str(eachsite)}", + "pipeline": posixpath.join(batchpath, batchfile), + "output": outpath, + "output_structure": outputstructure, + "input": inputpath, + "data_file": posixpath.join(batchpath, batchfile), + } + zprojqueue.scheduleBatch(templateMessage_zproj) + else: + for eachwell in wells: + for eachsite in sites: + templateMessage_zproj = { + "Metadata": f"Metadata_Plate={plate},Metadata_Well={eachwell},Metadata_Site={str(eachsite)}", + "pipeline": posixpath.join(batchpath, batchfile), + "output": outpath, + "output_structure": outputstructure, + "input": inputpath, + "data_file": posixpath.join(batchpath, batchfile), + } + zprojqueue.scheduleBatch(templateMessage_zproj) + print("Z projection job submitted. Check your queue") + + elif step == "illum": + illumqueue = JobQueue(f"{identifier}_Illum") + if not outpath: + outpath = path_dict[path_style]["illumoutpath"] + if not usebatch: + if not pipeline: + pipeline = "illum.cppipe" + if not csvname: + csvname = "load_data.csv" + + for plate in platelist: + templateMessage_illum = { + "Metadata": f"Metadata_Plate={plate}", + "pipeline": posixpath.join(pipelinepath, pipeline), + "output": outpath, + "input": inputpath, + "data_file": posixpath.join(datafilepath, plate, csvname), + } + + illumqueue.scheduleBatch(templateMessage_illum) + else: + if not batchfile: + batchfile = "Batch_data_illum.h5" + for plate in platelist: + templateMessage_illum = { + "Metadata": f"Metadata_Plate={plate}", + "pipeline": posixpath.join(batchpath, batchfile), + "output": outpath, + "input": inputpath, + "data_file": posixpath.join(batchpath, batchfile), + } + illumqueue.scheduleBatch(templateMessage_illum) + + print("Illum job submitted. Check your queue") + + elif step == "qc": + qcqueue = JobQueue(f"{identifier}_QC") + if not outpath: + outpath = path_dict[path_style]["QCoutpath"] + if not usebatch: + if not pipeline: + pipeline = "qc.cppipe" + if not csvname: + csvname = "load_data.csv" + + for plate in platelist: + if all(len(ele) == 0 for ele in wells): + for eachrow in rows: + for eachcol in columns: + templateMessage_qc = { + "Metadata": f"Metadata_Plate={plate},Metadata_Well={eachrow}{int(eachcol):{well_format}}", + "pipeline": posixpath.join(pipelinepath, pipeline), + "output": outpath, + "input": inputpath, + "data_file": posixpath.join(datafilepath, plate, csvname), + } + qcqueue.scheduleBatch(templateMessage_qc) + else: + for eachwell in wells: + templateMessage_qc = { + "Metadata": f"Metadata_Plate={plate},Metadata_Well={eachwell}", + "pipeline": posixpath.join(pipelinepath, pipeline), + "output": outpath, + "input": inputpath, + "data_file": posixpath.join(datafilepath, plate, csvname), + } + qcqueue.scheduleBatch(templateMessage_qc) else: - templateMessage_illum = {'Metadata': 'Metadata_Plate='+toillum, - 'pipeline': posixpath.join(batchpath,batchpipenameillum), - 'output': illumoutpath, - 'input':inputpath, - 'data_file': posixpath.join(batchpath,batchpipenameillum) - } - - illumqueue.scheduleBatch(templateMessage_illum) - - print('Illum job submitted. Check your queue') - -def MakeQCJobs(batch=False): - qcqueue = JobQueue(appname+'_QC') - for toqc in platelist: - for eachrow in rows: - for eachcol in columns: - if not batch: - templateMessage_qc = {'Metadata': 'Metadata_Plate='+toqc+',Metadata_Well='+eachrow+well_format %eachcol, - 'pipeline': posixpath.join(pipelinepath,qcpipename), - 'output': QCoutpath, - 'input': inputpath, - 'data_file': posixpath.join(datafilepath,toqc,csvname) - } + if not batchfile: + batchfile = "Batch_data_qc.h5" + for plate in platelist: + if all(len(ele) == 0 for ele in wells): + for eachrow in rows: + for eachcol in columns: + templateMessage_qc = { + "Metadata": f"Metadata_Plate={plate},Metadata_Well={eachrow}{int(eachcol):{well_format}}", + "pipeline": posixpath.join(batchpath, batchfile), + "output": outpath, + "input": inputpath, + "data_file": posixpath.join(batchpath, batchfile), + } + qcqueue.scheduleBatch(templateMessage_qc) else: - templateMessage_qc = {'Metadata': 'Metadata_Plate='+toqc+',Metadata_Well='+eachrow+well_format %eachcol, - 'pipeline': posixpath.join(batchpath,batchpipenameqc), - 'output': QCoutpath, - 'input': inputpath, - 'data_file': posixpath.join(batchpath,batchpipenameqc) + for well in wells: + templateMessage_qc = { + "Metadata": f"Metadata_Plate={plate},Metadata_Well={eachwell}", + "pipeline": posixpath.join(batchpath, batchfile), + "output": outpath, + "input": inputpath, + "data_file": posixpath.join(batchpath, batchfile), + } + qcqueue.scheduleBatch(templateMessage_qc) + + print("QC job submitted. Check your queue") + + elif step == "qc_persite": + qcqueue = JobQueue(f"{identifier}_QC") + if not outpath: + outpath = path_dict[path_style]["QCoutpath"] + if not usebatch: + if not pipeline: + pipeline = "qc.cppipe" + if not csvname: + csvname = "load_data.csv" + + for plate in platelist: + if all(len(ele) == 0 for ele in wells): + for eachrow in rows: + for eachcol in columns: + for eachsite in sites: + templateMessage_qc = { + "Metadata": f"Metadata_Plate={plate},Metadata_Well={eachrow}{int(eachcol):{well_format}},Metadata_Site={str(eachsite)}", + "pipeline": posixpath.join(pipelinepath, pipeline), + "output": outpath, + "input": inputpath, + "data_file": posixpath.join( + datafilepath, plate, csvname + ), + } + qcqueue.scheduleBatch(templateMessage_qc) + else: + for well in wells: + for eachsite in sites: + templateMessage_qc = { + "Metadata": f"Metadata_Plate={plate},Metadata_Well={eachwell},Metadata_Site={str(eachsite)}", + "pipeline": posixpath.join(pipelinepath, pipeline), + "output": outpath, + "input": inputpath, + "data_file": posixpath.join( + datafilepath, plate, csvname + ), + } + qcqueue.scheduleBatch(templateMessage_qc) + else: + if not batchfile: + batchfile = "Batch_data_qc.h5" + for plate in platelist: + if all(len(ele) == 0 for ele in wells): + for eachrow in rows: + for eachcol in columns: + for eachsite in sites: + templateMessage_qc = { + "Metadata": f"Metadata_Plate={plate},Metadata_Well={eachrow}{int(eachcol):{well_format}},Metadata_Site={str(eachsite)}", + "pipeline": posixpath.join(batchpath, batchfile), + "output": outpath, + "input": inputpath, + "data_file": posixpath.join(batchpath, batchfile), + } + qcqueue.scheduleBatch(templateMessage_qc) + else: + for well in wells: + for eachsite in sites: + templateMessage_qc = { + "Metadata": f"Metadata_Plate={plate},Metadata_Well={eachwell},Metadata_Site={str(eachsite)}", + "pipeline": posixpath.join(batchpath, batchfile), + "output": outpath, + "input": inputpath, + "data_file": posixpath.join(batchpath, batchfile), + } + qcqueue.scheduleBatch(templateMessage_qc) + + print("QC job submitted. Check your queue") + + elif step == "assaydev": + assaydevqueue = JobQueue(f"{identifier}_AssayDev") + if not outpath: + outpath = path_dict[path_style]["assaydevoutpath"] + if not usebatch: + if not pipeline: + pipeline = "assaydev.cppipe" + if not csvname: + csvname = "load_data_with_illum.csv" + + for plate in platelist: + if all(len(ele) == 0 for ele in wells): + for eachrow in rows: + for eachcol in columns: + templateMessage_ad = { + "Metadata": f"Metadata_Plate={plate},Metadata_Well={eachrow}{int(eachcol):{well_format}}", + "pipeline": posixpath.join(pipelinepath, pipeline), + "output": outpath, + "input": inputpath, + "data_file": posixpath.join(datafilepath, plate, csvname), + } + assaydevqueue.scheduleBatch(templateMessage_ad) + else: + for well in wells: + templateMessage_ad = { + "Metadata": f"Metadata_Plate={plate},Metadata_Well={eachwell}", + "pipeline": posixpath.join(pipelinepath, pipeline), + "output": outpath, + "input": inputpath, + "data_file": posixpath.join(datafilepath, plate, csvname), + } + assaydevqueue.scheduleBatch(templateMessage_ad) + else: + if not batchfile: + batchfile = "Batch_data_assaydev.h5" + for plate in platelist: + if all(len(ele) == 0 for ele in wells): + for eachrow in rows: + for eachcol in columns: + templateMessage_ad = { + "Metadata": f"Metadata_Plate={plate},Metadata_Well={eachrow}{int(eachcol):{well_format}}", + "pipeline": posixpath.join(batchpath, batchfile), + "output": outpath, + "input": inputpath, + "data_file": posixpath.join(batchpath, batchfile), + } + assaydevqueue.scheduleBatch(templateMessage_ad) + else: + for eachwell in wells: + templateMessage_ad = { + "Metadata": f"Metadata_Plate={plate},Metadata_Well={eachwell}", + "pipeline": posixpath.join(batchpath, batchfile), + "output": outpath, + "input": inputpath, + "data_file": posixpath.join(batchpath, batchfile), + } + assaydevqueue.scheduleBatch(templateMessage_ad) + + print("AssayDev job submitted. Check your queue") + + elif step == "analysis": + analysisqueue = JobQueue(f"{identifier}_Analysis") + if not outputstructure: + outputstructure = ( + "Metadata_Plate/analysis/Metadata_Plate-Metadata_Well-Metadata_Site" + ) + if not outpath: + outpath = path_dict[path_style]["analysisoutpath"] + if not usebatch: + if not pipeline: + pipeline = "analysis.cppipe" + if not csvname: + csvname = "load_data_with_illum.csv" + for plate in platelist: + if all(len(ele) == 0 for ele in wells): + for eachrow in rows: + for eachcol in columns: + for eachsite in sites: + templateMessage_analysis = { + "Metadata": f"Metadata_Plate={plate},Metadata_Well={eachrow}{int(eachcol):{well_format}},Metadata_Site={str(eachsite)}", + "pipeline": posixpath.join(pipelinepath, pipeline), + "output": outpath, + "output_structure": outputstructure, + "input": inputpath, + "data_file": posixpath.join( + datafilepath, plate, csvname + ), } - qcqueue.scheduleBatch(templateMessage_qc) - - print('QC job submitted. Check your queue') - -def MakeQCJobs_persite(batch=False): - qcqueue = JobQueue(appname+'_QC') - for toqc in platelist: - for eachrow in rows: - for eachcol in columns: - for eachsite in sites: - if not batch: - templateMessage_qc = {'Metadata': 'Metadata_Plate='+toqc+',Metadata_Well='+eachrow+well_format %eachcol+',Metadata_Site='+str(eachsite), - 'pipeline': posixpath.join(pipelinepath,qcpipename), - 'output': QCoutpath, - 'input': inputpath, - 'data_file': posixpath.join(datafilepath,toqc,csvname) - } - else: - templateMessage_qc = {'Metadata': 'Metadata_Plate='+toqc+',Metadata_Well='+eachrow+well_format %eachcol+',Metadata_Site='+str(eachsite), - 'pipeline': posixpath.join(batchpath,batchpipenameqc), - 'output': QCoutpath, - 'input': inputpath, - 'data_file': posixpath.join(batchpath,batchpipenameqc) - } - - qcqueue.scheduleBatch(templateMessage_qc) - - print('QC job submitted. Check your queue') - -def MakeAssayDevJobs(batch=False): - assaydevqueue = JobQueue(appname+'_AssayDev') - for toad in platelist: - for eachrow in rows: - for eachcol in columns: - if not batch: - templateMessage_ad = {'Metadata': 'Metadata_Plate='+toad+',Metadata_Well='+eachrow+well_format %eachcol, - 'pipeline': posixpath.join(pipelinepath,assaydevpipename), - 'output': assaydevoutpath, - 'input': inputpath, - 'data_file': posixpath.join(datafilepath,toad,csv_with_illumname) - } + analysisqueue.scheduleBatch(templateMessage_analysis) else: - templateMessage_ad = {'Metadata': 'Metadata_Plate='+toad+',Metadata_Well='+eachrow+well_format %eachcol, - 'pipeline': posixpath.join(batchpath,batchpipenameassaydev), - 'output': assaydevoutpath, - 'input': inputpath, - 'data_file': posixpath.join(batchpath,batchpipenameassaydev) + for eachwell in wells: + for eachsite in sites: + templateMessage_analysis = { + "Metadata": f"Metadata_Plate={plate},Metadata_Well={eachwell},Metadata_Site={str(eachsite)}", + "pipeline": posixpath.join(pipelinepath, pipeline), + "output": outpath, + "output_structure": outputstructure, + "input": inputpath, + "data_file": posixpath.join( + datafilepath, plate, csvname + ), + } + analysisqueue.scheduleBatch(templateMessage_analysis) + else: + if not batchfile: + batchfile = "Batch_data_analysis.h5" + for plate in platelist: + if all(len(ele) == 0 for ele in wells): + for eachrow in rows: + for eachcol in columns: + for eachsite in sites: + templateMessage_analysis = { + "Metadata": f"Metadata_Plate={plate},Metadata_Well={eachrow}{int(eachcol):{well_format}},Metadata_Site={str(eachsite)}", + "pipeline": posixpath.join(batchpath, batchfile), + "output": outpath, + "output_structure": outputstructure, + "input": inputpath, + "data_file": posixpath.join(batchpath, batchfile), } - assaydevqueue.scheduleBatch(templateMessage_ad) - - print('AssayDev job submitted. Check your queue') - -def MakeAnalysisJobs(batch=False): - analysisqueue = JobQueue(appname+'_Analysis') - for toanalyze in platelist: - for eachrow in rows: - for eachcol in columns: - for eachsite in sites: - if not batch: - templateMessage_analysis = {'Metadata': 'Metadata_Plate='+toanalyze+',Metadata_Well='+eachrow+well_format %eachcol+',Metadata_Site='+str(eachsite), - 'pipeline': posixpath.join(pipelinepath,analysispipename), - 'output': analysisoutpath, - 'output_structure':anlysisoutputstructure, - 'input':inputpath, - 'data_file': posixpath.join(datafilepath,toanalyze,csv_with_illumname) - } - else: - templateMessage_analysis = {'Metadata': 'Metadata_Plate='+toanalyze+',Metadata_Well='+eachrow+well_format %eachcol+',Metadata_Site='+str(eachsite), - 'pipeline': posixpath.join(batchpath,batchpipenameanalysis), - 'output': analysisoutpath, - 'output_structure':anlysisoutputstructure, - 'input':inputpath, - 'data_file': posixpath.join(batchpath,batchpipenameanalysis) - } - - analysisqueue.scheduleBatch(templateMessage_analysis) - - print('Analysis job submitted. Check your queue') - -#MakeZprojJobs(batch=False) -#MakeIllumJobs(batch=False) -#MakeQCJobs(batch=False) -#MakeQCJobs_persite(batch=False) -#MakeAssayDevJobs(batch=False) -#MakeAnalysisJobs(batch=False) + analysisqueue.scheduleBatch(templateMessage_analysis) + else: + for eachwell in wells: + for eachsite in sites: + templateMessage_analysis = { + "Metadata": f"Metadata_Plate={plate},Metadata_Well={eachwell},Metadata_Site={str(eachsite)}", + "pipeline": posixpath.join(batchpath, batchfile), + "output": outpath, + "output_structure": outputstructure, + "input": inputpath, + "data_file": posixpath.join(batchpath, batchfile), + } + analysisqueue.scheduleBatch(templateMessage_analysis) + + print("Analysis job submitted. Check your queue") + + else: + print(f"Step {step} not supported.") + + +if __name__ == "__main__": + parser = argparse.ArgumentParser( + description="Make jobs for Distributed-CellProfiler" + ) + parser.add_argument( + "step", + help="Step to make jobs for. Supported steps are zproj, illum, qc, qc_persite, assaydev, analysis", + ) + parser.add_argument("identifier", help="Project identifier") + parser.add_argument("batch", help="Name of batch") + parser.add_argument("platelist", type=lambda s: list(s.split(",")), help="List of plates to process") + parser.add_argument( + "--path-style", + dest="path_style", + default="default", + help="Style of input/output path. default or cpg (for Cell Painting Gallery structure).", + ) + parser.add_argument( + "--source", + dest="source", + default="", + help="For Cell Painting Gallery, what is the source (nesting under project identifier).", + ) + parser.add_argument( + "--plate-format", + dest="plate_format", + default="", + help="Plate format. Suppports 384 or 96. Auto-generates rows and columns and will overwrite --rows and --columns.", + ) + parser.add_argument( + "--rows", + dest="rows", + type=lambda s: list(s.split(",")), + default="A,B,C,D,E,F,G,H,I,J,K,L,M,N,O,P", + help="List of rows to process", + ) + parser.add_argument( + "--columns", + dest="columns", + type=lambda s: list(s.split(",")), + default="1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23,24", + help="List of rows to process", + ) + parser.add_argument( + "--wells", + dest="wells", + type=lambda s: list(s.split(",")), + default="", + help="Explicit list of rows to process. Will overwrite --rows and --columns.", + ) + parser.add_argument( + "--sites", + dest="sites", + type=lambda s: list(s.split(",")), + default="1,2,3,4,5,6,7,8,9", + help="List of site to process", + ) + parser.add_argument( + "--no-well-digit-pad", + dest="well_digit_pad", + action="store_false", + default=True, + help="Format wells with padding e.g. A01", + ) + parser.add_argument( + "--pipeline", + dest="pipeline", + default="", + help="Name of the pipeline to overwrite defaults of Zproj.cppipe, illum.cppipe, qc.cppipe, assaydev.cppipe, analysis.cppipe.", + ) + parser.add_argument( + "--pipeline-path", + dest="pipelinepath", + default="", + help="Overwrite default path to pipelines.", + ) + parser.add_argument( + "--input-path", + dest="inputpath", + default="", + help="Overwrite default path to input files.", + ) + parser.add_argument( + "--output-structure", + dest="outputstructure", + default="", + help="Overwrites default outuput structure. Supported for zproj and analysis.", + ) + parser.add_argument( + "--output-path", + dest="outpath", + default="", + help="Overwrites default outuput path.", + ) + parser.add_argument( + "--datafile-name", + dest="csvname", + default="", + help="Name of load data .csv. Overwrites default of load_data.csv (illum), load_data_with_illum.csv (assaydev, qc, qc_persite, analysis) and load_data_unprojected.csv (Zproj).", + ) + parser.add_argument( + "--datafile-path", + dest="datafilepath", + default="", + help="Overwrite default path to load data files.", + ) + parser.add_argument( + "--use-batch", + dest="usebatch", + action="store_true", + default=False, + help="Use CellProfiler h5 batch files instead of separate .cppipe and load_data.csv files. Supported for default, not cpg structure.", + ) + parser.add_argument( + "--batchfile-name", + dest="batchfile", + default="", + help="Name of h5 batch file (if using). Overwrites defaults.", + ) + parser.add_argument( + "--batchfile-path", + dest="batchpath", + default="", + help="Overwrite default path to h5 batch files.", + ) + args = parser.parse_args() + run_batch_general( + args.step, + path_style=args.path_style, + identifier=args.identifier, + batch=args.batch, + platelist=args.platelist, + source=args.source, + plate_format=args.plate_format, + rows=args.rows, + columns=args.columns, + wells=args.wells, + sites=args.sites, + well_digit_pad=args.well_digit_pad, + pipeline=args.pipeline, + pipelinepath=args.pipelinepath, + inputpath=args.inputpath, + outputstructure=args.outputstructure, + outpath=args.outpath, + csvname=args.csvname, + datafilepath=args.datafilepath, + usebatch=args.usebatch, + batchfile=args.batchfile, + batchpath=args.batchpath, + )