diff --git a/config/cluster/slurm.json b/config/cluster/slurm.json index 0010f18..492641b 100644 --- a/config/cluster/slurm.json +++ b/config/cluster/slurm.json @@ -73,11 +73,30 @@ "gres": "lscratch:512" }, "deepsomatic": { - "threads": "24", - "mem": "64G", + "threads": "36", + "mem": "192G", "time": "1-18:00:00", "gres": "lscratch:750" }, + "deepsomatic_make_examples": { + "threads": "36", + "mem": "96G", + "time": "1-00:00:00", + "gres": "lscratch:750" + }, + "deepsomatic_call_variants": { + "threads": "16", + "mem": "60G", + "partition": "gpu", + "gres": "gpu:a100:1,lscratch:450", + "time": "1-00:00:00" + }, + "deepsomatic_postprocess_variants": { + "threads": "4", + "mem": "64G", + "time": "1-00:00:00", + "gres": "lscratch:256" + }, "deepvariant": { "threads": "18", "mem": "48G", diff --git a/config/cluster/uge.json b/config/cluster/uge.json index 671a18d..05c8a42 100644 --- a/config/cluster/uge.json +++ b/config/cluster/uge.json @@ -75,6 +75,21 @@ "mem": "4G", "partition": "" }, + "deepsomatic_call_variants": { + "mem": "4G", + "partition": "", + "threads": "8" + }, + "deepsomatic_make_examples": { + "mem": "4G", + "partition": "", + "threads": "8" + }, + "deepsomatic_postprocess_variants": { + "mem": "8G", + "partition": "", + "threads": "4" + }, "deepvariant": { "mem": "3G", "partition": "", diff --git a/workflow/rules/depreciated.smk b/workflow/rules/depreciated.smk index 09839fc..fdf0fff 100644 --- a/workflow/rules/depreciated.smk +++ b/workflow/rules/depreciated.smk @@ -1,4 +1,17 @@ # Depreciated rules that may still be useful for some projects +def get_normal_sorted_bam(wildcards): + """ + Returns a tumor samples paired normal + See config['pairs'] for tumor, normal pairs. + """ + normal = tumor2normal[wildcards.name] + if normal: + # Runs in a tumor, normal mode + return join(workpath, "BAM", "{0}.sorted.bam".format(normal)) + else: + # Runs in tumor-only mode + return [] + # Depreciated germline variant calling rule(s) rule deepvariant: @@ -57,4 +70,83 @@ rule deepvariant: --output_vcf={output.vcf} \\ --num_shards={threads} \\ --intermediate_results_dir=${{tmp}} - """ \ No newline at end of file + """ + +# Depreciated somatic variant calling rule(s) +rule deepsomatic: + """ + Data processing step to call somatic variants using deep neural + network in tumor-normal pairs. DeepSomatic is an extension of the + deep learning-based variant caller DeepVariant that takes aligned + reads (in BAM or CRAM format) from tumor and normal data, produces + pileup image tensors from them, classifies each tensor using a CNN, + and finally reports somatic variants in a standard VCF or gVCF file. + This rule runs all three steps in the deepsomatic pipeline as a one + step: i.e. make_examples, call_variants, and postprocess_variants. + This is not optimal for large-scale projects as it will consume a lot + of resources inefficently (only the 2nd step in the dv pipeline can + make use of GPU-computing). As so, it is better to run the 1st/3rd + step on a normal compute node and run the 2nd step on a GPU node. + @Input: + Duplicate marked, sorted Tumor-Normal BAM file (scatter) + @Output: + Single-sample VCF file with called somatic variants + """ + input: + tumor = join(workpath, "BAM", "{name}.sorted.bam"), + normal = get_normal_sorted_bam + output: + vcf = join(workpath, "deepsomatic", "somatic", "{name}.deepsomatic.vcf"), + params: + rname = "deepsom", + genome = config['references']['GENOME'], + tmpdir = tmpdir, + # Building option for deepsomatic config, where: + # @WGS = --model_type=WGS + # @WES = --model_type=WES (may be added in future) + dv_model_type = "WGS", + # Get tumor and normal sample names + tumor = '{name}', + # Building option for the paired normal sorted bam + normal_bam_option = lambda w: "--reads_normal={0}.sorted.bam".format( + join(workpath, "BAM", tumor2normal[w.name]) + ) if tumor2normal[w.name] else "", + # Building option for the normal sample name + normal_name_option = lambda w: "--sample_name_normal={0}".format( + tumor2normal[w.name] + ) if tumor2normal[w.name] else "", + threads: int(allocated("threads", "deepsomatic", cluster)) + container: config['images']['deepsomatic'] + envmodules: config['tools']['deepsomatic'] + shell: """ + # Setups temporary directory for + # intermediate files with built-in + # mechanism for deletion on exit + if [ ! -d "{params.tmpdir}" ]; then mkdir -p "{params.tmpdir}"; fi + tmp=$(mktemp -d -p "{params.tmpdir}") + trap 'du -sh "${{tmp}}"; rm -rf "${{tmp}}"' EXIT + + # Export OpenBLAS variable to + # control the number of threads + # in a thread pool. By setting + # this variable to 1, work is + # done in the thread that ran + # the operation, rather than + # disbatching the work to a + # thread pool. If this option + # is not provided, it can lead + # to nested parallelism. + # See this issue for more info: + # https://github.com/google/deepsomatic/issues/28 + export OPENBLAS_NUM_THREADS=1 + + # Run deepsomatic + run_deepsomatic \\ + --model_type={params.dv_model_type} \\ + --ref={params.genome} \\ + --reads_tumor={input.tumor} {params.normal_bam_option} \\ + --sample_name_tumor={params.tumor} {params.normal_name_option} \\ + --output_vcf={output.vcf} \\ + --num_shards={threads} \\ + --intermediate_results_dir=${{tmp}} + """ diff --git a/workflow/rules/somatic.smk b/workflow/rules/somatic.smk index 3c7398d..0564b95 100644 --- a/workflow/rules/somatic.smk +++ b/workflow/rules/somatic.smk @@ -744,49 +744,61 @@ rule clairs_tumor_only: """ -rule deepsomatic: +rule deepsomatic_make_examples: """ Data processing step to call somatic variants using deep neural - network in tumor-normal pairs. DeepSomatic is an extension of the - deep learning-based variant caller DeepVariant that takes aligned - reads (in BAM or CRAM format) from tumor and normal data, produces - pileup image tensors from them, classifies each tensor using a CNN, - and finally reports somatic variants in a standard VCF or gVCF file. - This rule runs all three steps in the deepsomatic pipeline as a one - step: i.e. make_examples, call_variants, and postprocess_variants. - This is not optimal for large-scale projects as it will consume a lot - of resources inefficently (only the 2nd step in the dv pipeline can - make use of GPU-computing). As so, it is better to run the 1st/3rd - step on a normal compute node and run the 2nd step on a GPU node. + network. The make_examples step prepares the input data for the + deepsomatic's CNN. DeepSomatic is an extension of deep learning- + based variant caller Deepvariant. It is composed of multiple steps + that takes aligned reads (in BAM or CRAM format), produces pileup + image tensors from them, classifies each tensor using a convolutional + neural network, and finally reports the results in a standard VCF or + gVCF file. This rule is the first step in the deepsomatic pipeline: + * 1. make_examples (CPU, parallelizable with gnu-parallel) + 2. call_variants (GPU, use a GPU node) + 3. postprocess_variants (CPU, single-threaded) + Running deepsomatic in a single step using run_deepsomatic is not + optimal for large-scale projects as it will consume resources very + inefficently. As so, it is better to run the 1st/3rd step on a + compute node and run the 2nd step on a GPU node. @Input: - Duplicate marked, sorted Tumor-Normal BAM file (scatter) + Duplicate marked, sorted BAM file (scatter) @Output: - Single-sample VCF file with called somatic variants + Flag file to indicate success of make_examples """ input: tumor = join(workpath, "BAM", "{name}.sorted.bam"), normal = get_normal_sorted_bam output: - vcf = join(workpath, "deepsomatic", "somatic", "{name}.deepsomatic.vcf"), + success = join(workpath, "deepsomatic", "mk_examples", "{name}.make_examples.success"), params: - rname = "deepsom", + rname = "ds_mkexamples", genome = config['references']['GENOME'], tmpdir = tmpdir, - # Building option for deepsomatic config, where: - # @WGS = --model_type=WGS - # @WES = --model_type=WES (may be added in future) - dv_model_type = "WGS", + nshards = int(allocated("threads", "deepsomatic_make_examples", cluster))-1, + example = lambda w: join(workpath, "deepsomatic", "mk_examples", "{0}.make_examples.tfrecord@{1}.gz".format( + w.name, + int(allocated("threads", "deepsomatic_make_examples", cluster)) + )), + # TODO: add option --ffpe option to pipeline, that + # selects either the ffpe_wgs or ffpe_wes checkpoints. + # Building option for checkpoint file (assumes TN-pairs and + # non-FFPE samples), where: + # @WES = "/opt/models/deepsomatic/wes" + # @WGS = "/opt/models/deepsomatic/wgs" + ckpt = lambda _: "/opt/models/deepsomatic/wes" if run_wes else "/opt/models/deepsomatic/wgs", # Get tumor and normal sample names tumor = '{name}', # Building option for the paired normal sorted bam - normal_bam_option = lambda w: "--reads_normal={0}.sorted.bam".format( + normal_bam_option = lambda w: "--reads_normal {0}.sorted.bam".format( join(workpath, "BAM", tumor2normal[w.name]) ) if tumor2normal[w.name] else "", # Building option for the normal sample name - normal_name_option = lambda w: "--sample_name_normal={0}".format( + normal_name_option = lambda w: "--sample_name_normal {0}".format( tumor2normal[w.name] ) if tumor2normal[w.name] else "", - threads: int(allocated("threads", "deepsomatic", cluster)) + message: "Running DeepSomatic make_examples on '{input.tumor}' input file" + threads: int(allocated("threads", "deepsomatic_make_examples", cluster)) container: config['images']['deepsomatic'] envmodules: config['tools']['deepsomatic'] shell: """ @@ -795,31 +807,170 @@ rule deepsomatic: # mechanism for deletion on exit if [ ! -d "{params.tmpdir}" ]; then mkdir -p "{params.tmpdir}"; fi tmp=$(mktemp -d -p "{params.tmpdir}") - trap 'du -sh "${{tmp}}"; rm -rf "${{tmp}}"' EXIT - - # Export OpenBLAS variable to - # control the number of threads - # in a thread pool. By setting - # this variable to 1, work is - # done in the thread that ran - # the operation, rather than - # disbatching the work to a - # thread pool. If this option - # is not provided, it can lead - # to nested parallelism. - # See this issue for more info: - # https://github.com/google/deepsomatic/issues/28 + trap 'rm -rf "${{tmp}}"' EXIT + echo "Using tmpdir: ${{tmp}}" + export TMPDIR="${{tmp}}" + + # Run DeepSomatic make_examples and + # parallelize it using gnu-parallel. + # Exporting OpenBLAS variable to avoid + # any issues related to RLIMIT_NPROC, + # that issue will occur when the number + # of shards is >= 32. Exporting this + # OpenBLAS variable to also controls + # the number of threads in a thread + # pool. By setting this variable to 1, + # work is done in the thread that ran + # the operation, rather than disbatching + # the work to a thread pool. If this + # variable is not provided, it can lead + # to issues related to nested parallelism. + # See this Github issue for more info: + # https://github.com/google/deepsomatic/issues/28 export OPENBLAS_NUM_THREADS=1 + time seq 0 {params.nshards} \\ + | parallel \\ + --eta \\ + -q \\ + --halt 2 \\ + --line-buffer \\ + make_examples_somatic \\ + --mode calling \\ + --ref {params.genome} \\ + --reads_tumor {input.tumor} {params.normal_bam_option} \\ + --sample_name_tumor {params.tumor} {params.normal_name_option} \\ + --examples {params.example} \\ + --checkpoint "{params.ckpt}" \\ + --vsc_max_fraction_indels_for_non_target_sample "0.5" \\ + --vsc_max_fraction_snps_for_non_target_sample "0.5" \\ + --vsc_min_fraction_indels "0.05" \\ + --vsc_min_fraction_snps "0.029" \\ + --task {{}} \\ + && touch {output.success} + """ + + +rule deepsomatic_call_variants: + """ + Data processing step to call somatic variants using deep neural + network. The make_examples step prepares the input data for the + deepsomatic's CNN. DeepSomatic is an extension of deep learning- + based variant caller Deepvariant. It is composed of multiple steps + that takes aligned reads (in BAM or CRAM format), produces pileup + image tensors from them, classifies each tensor using a convolutional + neural network, and finally reports the results in a standard VCF or + gVCF file. This rule is the first step in the deepsomatic pipeline: + 1. make_examples (CPU, parallelizable with gnu-parallel) + * 2. call_variants (GPU, use a GPU node) + 3. postprocess_variants (CPU, single-threaded) + Running deepsomatic in a single step using run_deepsomatic is not + optimal for large-scale projects as it will consume resources very + inefficently. As so, it is better to run the 1st/3rd step on a + compute node and run the 2nd step on a GPU node. + @Input: + Flag file to indicate success of make_examples (scatter) + @Output: + Per sample call_variants tensorflow records file + """ + input: + success = join(workpath, "deepsomatic", "mk_examples", "{name}.make_examples.success"), + output: + callvar = join(workpath, "deepsomatic", "call_variants", "{name}.call_variants.tfrecord.gz"), + params: + rname = "ds_callvars", + genome = config['references']['GENOME'], + tmpdir = tmpdir, + # NOTE: There BE dragons here! + # We need allocation info from make_examples rule + # to determine the number of shards that were + # used in the make_examples step, this is used + # to resolve a dependency file of this rule, + # which is the examples tf record file produced by + # make_examples. This file gets passed to the + # --examples option of call_variants. + example = lambda w: join(workpath, "deepsomatic", "mk_examples", "{0}.make_examples.tfrecord@{1}.gz".format( + w.name, + int(allocated("threads", "deepsomatic_make_examples", cluster)) + )), + # TODO: add option --ffpe option to pipeline, that + # selects either the ffpe_wgs or ffpe_wes checkpoints. + # Building option for checkpoint file (assumes TN-pairs and + # non-FFPE samples), where: + # @WES = "/opt/models/deepsomatic/wes" + # @WGS = "/opt/models/deepsomatic/wgs" + ckpt = lambda _: "/opt/models/deepsomatic/wes" if run_wes else "/opt/models/deepsomatic/wgs", + message: "Running deepsomatic call_variants on '{wildcards.name}' sample" + threads: int(allocated("threads", "deepsomatic_call_variants", cluster)) + container: config['images']['deepsomatic'] + envmodules: config['tools']['deepsomatic'] + shell: """ + # Setups temporary directory for + # intermediate files with built-in + # mechanism for deletion on exit + if [ ! -d "{params.tmpdir}" ]; then mkdir -p "{params.tmpdir}"; fi + tmp=$(mktemp -d -p "{params.tmpdir}") + trap 'rm -rf "${{tmp}}"' EXIT + echo "Using tmpdir: ${{tmp}}" + export TMPDIR="${{tmp}}" + + # Run DeepSomatic call_variants + time call_variants \\ + --outfile {output.callvar} \\ + --examples {params.example} \\ + --checkpoint {params.ckpt} + """ + + +rule deepsomatic_postprocess_variants: + """ + Data processing step to call somatic variants using deep neural + network. The make_examples step prepares the input data for the + deepsomatic's CNN. DeepSomatic is an extension of deep learning- + based variant caller Deepvariant. It is composed of multiple steps + that takes aligned reads (in BAM or CRAM format), produces pileup + image tensors from them, classifies each tensor using a convolutional + neural network, and finally reports the results in a standard VCF or + gVCF file. This rule is the first step in the deepsomatic pipeline: + 1. make_examples (CPU, parallelizable with gnu-parallel) + 2. call_variants (GPU, use a GPU node) + * 3. postprocess_variants (CPU, single-threaded) + Running deepsomatic in a single step using run_deepsomatic is not + optimal for large-scale projects as it will consume resources very + inefficently. As so, it is better to run the 1st/3rd step on a + compute node and run the 2nd step on a GPU node. + @Input: + Per-sample call_variants tensorflow records file (scatter) + @Output: + Single-sample gVCF file with called variants + """ + input: + callvar = join(workpath, "deepsomatic", "call_variants", "{name}.call_variants.tfrecord.gz"), + output: + vcf = join(workpath, "deepsomatic", "somatic", "{name}.deepsomatic.vcf"), + params: + rname = "ds_postprovars", + genome = config['references']['GENOME'], + tmpdir = tmpdir, + message: "Running DeepSomatic postprocess_variants on '{input.callvar}' input file" + threads: int(allocated("threads", "deepsomatic_postprocess_variants", cluster)) + container: config['images']['deepsomatic'] + envmodules: config['tools']['deepsomatic'] + shell: """ + # Setups temporary directory for + # intermediate files with built-in + # mechanism for deletion on exit + if [ ! -d "{params.tmpdir}" ]; then mkdir -p "{params.tmpdir}"; fi + tmp=$(mktemp -d -p "{params.tmpdir}") + trap 'rm -rf "${{tmp}}"' EXIT + echo "Using tmpdir: ${{tmp}}" + export TMPDIR="${{tmp}}" - # Run deepsomatic - run_deepsomatic \\ - --model_type={params.dv_model_type} \\ - --ref={params.genome} \\ - --reads_tumor={input.tumor} {params.normal_bam_option} \\ - --sample_name_tumor={params.tumor} {params.normal_name_option} \\ - --output_vcf={output.vcf} \\ - --num_shards={threads} \\ - --intermediate_results_dir=${{tmp}} + # Run DeepSomatic postprocess_variants + time postprocess_variants \\ + --ref {params.genome} \\ + --infile {input.callvar} \\ + --outfile {output.vcf} \\ + --process_somatic=true """