diff --git a/src/main/scala/com/fulcrumgenomics/bam/Bams.scala b/src/main/scala/com/fulcrumgenomics/bam/Bams.scala index 6e1650dc3..578bef1b9 100644 --- a/src/main/scala/com/fulcrumgenomics/bam/Bams.scala +++ b/src/main/scala/com/fulcrumgenomics/bam/Bams.scala @@ -33,10 +33,9 @@ import com.fulcrumgenomics.util.{Io, ProgressLogger, Sorter} import htsjdk.samtools.SAMFileHeader.{GroupOrder, SortOrder} import htsjdk.samtools.SamPairUtil.PairOrientation import htsjdk.samtools._ -import htsjdk.samtools.reference.ReferenceSequenceFileWalker +import htsjdk.samtools.reference.{ReferenceSequence, ReferenceSequenceFileWalker} import htsjdk.samtools.util.{CloserUtil, CoordMath, SequenceUtil} -import scala.collection.mutable.{ArrayBuffer, ListBuffer} import scala.math.{max, min} /** @@ -246,7 +245,7 @@ object Bams extends LazyLogging { case (Some(Queryname), _) => new SelfClosingIterator(iterator.bufferBetter, () => CloserUtil.close(iterator)) case (_, _) => logger.info(parts = "Sorting into queryname order.") - val progress = ProgressLogger(this.logger, "Records", "sorted") + val progress = ProgressLogger(this.logger, "records", "sorted") val sort = sorter(Queryname, header, maxInMemory, tmpDir) iterator.foreach { rec => progress.record(rec) @@ -412,7 +411,22 @@ object Bams extends LazyLogging { * @param rec the SamRecord to update * @param ref a reference sequence file walker to pull the reference information from */ - def regenerateNmUqMdTags(rec: SamRecord, ref: ReferenceSequenceFileWalker): Unit = { + def regenerateNmUqMdTags(rec: SamRecord, ref: ReferenceSequenceFileWalker): SamRecord = { + if (rec.unmapped) regenerateNmUqMdTags(rec, Map.empty[Int, ReferenceSequence]) else { + val refSeq = ref.get(rec.refIndex) + regenerateNmUqMdTags(rec, Map(refSeq.getContigIndex -> refSeq)) + } + rec + } + + /** + * Ensures that any NM/UQ/MD tags on the read are accurate. If the read is unmapped, any existing + * values are removed. If the read is mapped all three tags will have values regenerated. + * + * @param rec the SamRecord to update + * @param ref a reference sequence file walker to pull the reference information from + */ + def regenerateNmUqMdTags(rec: SamRecord, ref: Map[Int, ReferenceSequence]): SamRecord = { import SAMTag._ if (rec.unmapped) { rec(NM.name()) = null @@ -420,12 +434,15 @@ object Bams extends LazyLogging { rec(MD.name()) = null } else { - val refBases = ref.get(rec.refIndex).getBases + val refBases = ref.getOrElse(rec.refIndex, throw new IllegalArgumentException( + s"Record '${rec.name}' had contig index '${rec.refIndex}', but not found in the input reference map" + )).getBases SequenceUtil.calculateMdAndNmTags(rec.asSam, refBases, true, true) if (rec.quals != null && rec.quals.length != 0) { rec(SAMTag.UQ.name) = SequenceUtil.sumQualitiesOfMismatches(rec.asSam, refBases, 0) } } + rec } /** diff --git a/src/main/scala/com/fulcrumgenomics/bam/ErrorRateByReadPosition.scala b/src/main/scala/com/fulcrumgenomics/bam/ErrorRateByReadPosition.scala index 10eb9579b..9faaeef74 100644 --- a/src/main/scala/com/fulcrumgenomics/bam/ErrorRateByReadPosition.scala +++ b/src/main/scala/com/fulcrumgenomics/bam/ErrorRateByReadPosition.scala @@ -26,7 +26,6 @@ package com.fulcrumgenomics.bam import java.util - import com.fulcrumgenomics.FgBioDef._ import com.fulcrumgenomics.bam.api.SamSource import com.fulcrumgenomics.cmdline.{ClpGroups, FgBioTool} @@ -35,9 +34,9 @@ import com.fulcrumgenomics.commons.util.{LazyLogging, SimpleCounter} import com.fulcrumgenomics.fasta.SequenceDictionary import com.fulcrumgenomics.sopt._ import com.fulcrumgenomics.util.Metric.Count -import com.fulcrumgenomics.util.{Metric, ProgressLogger, Rscript, Sequences} +import com.fulcrumgenomics.util.{IntervalListSource, Metric, ProgressLogger, Rscript, Sequences} import com.fulcrumgenomics.vcf.{ByIntervalListVariantContextIterator, VariantMask} -import htsjdk.samtools.SAMRecord +import htsjdk.samtools.{SAMRecord, SAMSequenceDictionary} import htsjdk.samtools.filter.{DuplicateReadFilter, FailsVendorReadQualityFilter, SamRecordFilter, SecondaryOrSupplementaryFilter} import htsjdk.samtools.reference.ReferenceSequenceFileWalker import htsjdk.samtools.util.{IntervalList, SamLocusIterator, SequenceUtil} @@ -46,6 +45,7 @@ import htsjdk.variant.vcf.VCFFileReader import scala.annotation.switch import scala.collection.mutable import scala.util.Failure +import htsjdk.samtools.util.{BufferedLineReader, Interval, IntervalList} @clp(group=ClpGroups.SamOrBam, description = """ @@ -87,6 +87,7 @@ class ErrorRateByReadPosition @arg(flag='r', doc="Reference sequence fasta file.") val ref: PathToFasta, @arg(flag='v', doc="Optional file of variant sites to ignore.") val variants: Option[PathToVcf] = None, @arg(flag='l', doc="Optional list of intervals to restrict analysis to.") val intervals: Option[PathToIntervals] = None, + @arg(flag='x', doc="Optional list of intervals to ignore.") val ignoreIntervals: Option[PathToIntervals] = None, @arg(flag='d', doc="Include duplicate reads, otherwise ignore.") val includeDuplicates: Boolean = false, @arg(flag='m', doc="The minimum mapping quality for a read to be included.") val minMappingQuality: Int = 20, @arg(flag='q', doc="The minimum base quality for a base to be included.") val minBaseQuality: Int = 0, @@ -127,13 +128,25 @@ class ErrorRateByReadPosition } } + /** Builds the option intervals over which analysis is restricted. */ + private def buildIntervalList(dict: SequenceDictionary): Option[IntervalList] = { + val intervalsToKeep = this.intervals.map { path => IntervalListSource(path).toIntervalList.uniqued(false) } + val intervalsToIgnore = this.ignoreIntervals.map { path => IntervalListSource(path).toIntervalList.uniqued(false) } + (intervalsToKeep, intervalsToIgnore) match { + case (None, None) => None // no intervals + case (Some(left), None) => Some(left) // just the intervals to keep + case (Some(left), Some(right)) => Some(IntervalList.subtract(left, right)) // remove the ignore intervals from the keep intervals + case (None, Some(right)) => Some(IntervalList.invert(right)) // invert the ignore intervals + } + } + /** Computes the metrics from the BAM file. */ private[bam] def computeMetrics(in: SamSource = SamSource(input, ref=Some(ref))): Seq[ErrorRateByReadPositionMetric] = { import com.fulcrumgenomics.fasta.Converters.FromSAMSequenceDictionary - val progress = ProgressLogger(logger, verb="Processed", noun="loci", unit=50000) - val ilist = this.intervals.map(p => IntervalList.fromFile(p.toFile).uniqued(false)) - + val progress = ProgressLogger(logger, verb="Processed", noun="loci", unit=50000) val refWalker = new ReferenceSequenceFileWalker(this.ref.toFile) + val dict = refWalker.getSequenceDictionary.fromSam + val ilist = buildIntervalList(dict=dict) val locusIterator = buildSamLocusIterator(in, ilist).iterator() val variantMask = buildVariantMask(variants, ilist, refWalker.getSequenceDictionary.fromSam) @@ -174,7 +187,7 @@ class ErrorRateByReadPosition /** Generates a SamLocusIterator that will traverse over the relevant parts of the BAM. */ private def buildSamLocusIterator(in: SamSource, intervals: Option[IntervalList]): SamLocusIterator = { val iterator = intervals match { - case None => new SamLocusIterator(in.toSamReader) + case None => new SamLocusIterator(in.toSamReader) case Some(i) => new SamLocusIterator(in.toSamReader, i) } diff --git a/src/main/scala/com/fulcrumgenomics/fasta/ReferenceSequenceIterator.scala b/src/main/scala/com/fulcrumgenomics/fasta/ReferenceSequenceIterator.scala index da0d0c129..3f94de957 100644 --- a/src/main/scala/com/fulcrumgenomics/fasta/ReferenceSequenceIterator.scala +++ b/src/main/scala/com/fulcrumgenomics/fasta/ReferenceSequenceIterator.scala @@ -25,15 +25,16 @@ package com.fulcrumgenomics.fasta import htsjdk.samtools.reference.{ReferenceSequence, ReferenceSequenceFile, ReferenceSequenceFileFactory} import com.fulcrumgenomics.commons.CommonsDef._ +import com.fulcrumgenomics.commons.collection.SelfClosingIterator object ReferenceSequenceIterator { /** Constructs an iterator over a reference sequence from a Path to the FASTA file. */ - def apply(path: PathToFasta, stripComments: Boolean = false) = { + def apply(path: PathToFasta, stripComments: Boolean = false): ReferenceSequenceIterator = { new ReferenceSequenceIterator(ReferenceSequenceFileFactory.getReferenceSequenceFile(path, stripComments, false)) } /** Constructs an iterator over a reference sequence from a ReferenceSequenceFile. */ - def apply(ref: ReferenceSequenceFile) = { + def apply(ref: ReferenceSequenceFile): ReferenceSequenceIterator = { new ReferenceSequenceIterator(ref) } } @@ -45,8 +46,15 @@ object ReferenceSequenceIterator { class ReferenceSequenceIterator private(private val ref: ReferenceSequenceFile) extends Iterator[ReferenceSequence] { // The next reference sequence; will be null if there is no next :( private var nextSequence: ReferenceSequence = ref.nextSequence() + private var isOpen: Boolean = true - override def hasNext: Boolean = this.nextSequence != null + override def hasNext: Boolean = if (this.nextSequence != null) true else { + if (isOpen) { + isOpen = false + ref.close() + } + false + } override def next(): ReferenceSequence = { assert(hasNext, "next() called on empty iterator") diff --git a/src/main/scala/com/fulcrumgenomics/umi/FilterConsensusReads.scala b/src/main/scala/com/fulcrumgenomics/umi/FilterConsensusReads.scala index 1c79907d3..26efefe9c 100644 --- a/src/main/scala/com/fulcrumgenomics/umi/FilterConsensusReads.scala +++ b/src/main/scala/com/fulcrumgenomics/umi/FilterConsensusReads.scala @@ -24,20 +24,24 @@ package com.fulcrumgenomics.umi -import java.lang.Math.{max, min} - import com.fulcrumgenomics.FgBioDef._ import com.fulcrumgenomics.bam.Bams import com.fulcrumgenomics.bam.api.{SamOrder, SamRecord, SamSource, SamWriter} import com.fulcrumgenomics.cmdline.{ClpGroups, FgBioTool} +import com.fulcrumgenomics.commons.io.Writer import com.fulcrumgenomics.commons.util.LazyLogging +import com.fulcrumgenomics.fasta.ReferenceSequenceIterator import com.fulcrumgenomics.sopt.{arg, clp} import com.fulcrumgenomics.util.NumericTypes.PhredScore import com.fulcrumgenomics.util.{Io, ProgressLogger} -import htsjdk.samtools.SAMFileHeader.SortOrder +import htsjdk.samtools.SAMFileHeader +import htsjdk.samtools.SAMFileHeader.GroupOrder import htsjdk.samtools.reference.ReferenceSequenceFileWalker import htsjdk.samtools.util.SequenceUtil +import java.io.Closeable +import java.lang.Math.{max, min} + /** Filter values for filtering consensus reads */ private[umi] case class ConsensusReadFilter(minReads: Int, maxReadErrorRate: Double, maxBaseErrorRate: Double) @@ -114,7 +118,9 @@ class FilterConsensusReads @arg(flag='q', doc="The minimum mean base quality across the consensus read.") val minMeanBaseQuality: Option[PhredScore] = None, @arg(flag='s', doc="Mask (make `N`) consensus bases where the AB and BA consensus reads disagree (for duplex-sequencing only).") - val requireSingleStrandAgreement: Boolean = false + val requireSingleStrandAgreement: Boolean = false, + @arg(flag='S', doc="The sort order of the output, if `:none:` then the same as the input.") val sortOrder: Option[SamOrder] = Some(SamOrder.Coordinate), + @arg(flag='l', doc="Load the full reference sequence in memory") val loadFullReference: Boolean = false ) extends FgBioTool with LazyLogging { // Baseline input validation Io.assertReadable(input) @@ -169,12 +175,9 @@ class FilterConsensusReads private val EmptyFilterResult = FilterResult(keepRead=true, maskedBases=0) override def execute(): Unit = { - val in = SamSource(input) - val header = in.header.clone() - header.setSortOrder(SortOrder.coordinate) - val sorter = Bams.sorter(SamOrder.Coordinate, header, maxRecordsInRam=MaxRecordsInMemoryWhenSorting) - val out = SamWriter(output, header) - val progress1 = ProgressLogger(logger, verb="Filtered and masked") + val progress = ProgressLogger(logger, verb="Filtered and masked") + val in = SamSource(input) + val out = buildOutputWriter(in.header) // Go through the reads by template and do the filtering val templateIterator = Bams.templateIterator(in, maxInMemory=MaxRecordsInMemoryWhenSorting) @@ -201,34 +204,126 @@ class FilterConsensusReads keptReads += primaryReadCount totalBases += r1.length + template.r2.map(_.length).getOrElse(0) maskedBases += r1Result.maskedBases + r2Result.maskedBases - sorter += r1 - progress1.record(r1) - template.r2.foreach { r => sorter += r; progress1.record(r) } + out += r1 + progress.record(r1) + template.r2.foreach { r => out += r; progress.record(r) } template.allSupplementaryAndSecondary.foreach { r => val result = filterRecord(r) if (result.keepRead) { - sorter += r - progress1.record(r) + out += r + progress.record(r) } } } } + progress.logLast() - // Then iterate the reads in coordinate order and re-calculate key tags - logger.info("Filtering complete; fixing tags and writing coordinate sorted reads.") - val progress2 = new ProgressLogger(logger, verb="Wrote") - val walker = new ReferenceSequenceFileWalker(ref.toFile) - sorter.foreach { rec => - Bams.regenerateNmUqMdTags(rec, walker) - out += rec - progress2.record(rec) - } - + logger.info("Finalizing the output") in.safelyClose() out.close() - logger.info(f"Output ${keptReads}%,d of ${totalReads}%,d primary consensus reads.") - logger.info(f"Masked ${maskedBases}%,d of ${totalBases}%,d bases in retained primary consensus reads.") + logger.info(f"Output $keptReads%,d of $totalReads%,d primary consensus reads.") + logger.info(f"Masked $maskedBases%,d of $totalBases%,d bases in retained primary consensus reads.") + } + + /** Builds a method to re-generate teh NM/UQ/MD tags based on if we are loading the full reference or not. Also + * returns a method to close the underling reference */ + private def buildRegenerateNmUqMdTags(): (SamRecord => SamRecord, () => Unit) = { + if (loadFullReference) { + logger.info("Loading reference into memory") + val refMap = ReferenceSequenceIterator(ref, stripComments=true).map { ref => ref.getContigIndex -> ref}.toMap + val f = (rec: SamRecord) => Bams.regenerateNmUqMdTags(rec, refMap) + (f, () => ()) + } + else { + logger.warning("Will require coordinate sorting to update tags, try --load-full-reference instead") + val walker = new ReferenceSequenceFileWalker(ref) + val f = (rec: SamRecord) => Bams.regenerateNmUqMdTags(rec, walker) + (f, () => walker.safelyClose()) + } + } + + /** Builds the writer to which filtered records should be written. + * + * The filtered records may be sorted once, twice, or never depending on (a) if the full reference is loaded into + * memory, (b) the order after filtering, and (c) the output order. + * + * The order after filtering is determined as follows: + * 1. If the input order is Queryname, or the input is query grouped, then use the input order. + * 2. Otherwise, Queryname. + * + * The output order is determined as follows: + * 1. The order from the `--sort-order` option. + * 2. Otherwise, the order from the input file, if an order is present. + * 3. Otherwise, the order after filtering. + * + * If the full reference has not been loaded then: + * 1. the filtered records are sorted by coordinate to reset the SAM tags. + * 2. if the output order is coordinate, the records are then written directly to the output, otherwise they are + * re-sorted (for a second time) to the desired output order, and written to the output. + * + * If the full reference has been loaded then: + * 1. if the output order is the same as the order after filtering, the filtered records are written to the output, + * otherwise they re-sorted to the desired output order and written to the output. + + * */ + private def buildOutputWriter(header: SAMFileHeader): Closeable with Writer[SamRecord] = { + val (regenerateNmUqMdTags, refCloseMethod) = buildRegenerateNmUqMdTags() + val outHeader = header.clone() + + // Check if the input will be re-sorted into QueryName, or if the input sort order will be kept + val orderAfterFiltering = SamOrder(header) match { + case Some(order) if order == SamOrder.Queryname || order.groupOrder == GroupOrder.query => order + case None => SamOrder.Queryname + } + + // Get the output order + val outputOrder = this.sortOrder + .orElse(SamOrder(header)) // use the input sort order + .getOrElse(orderAfterFiltering) // use the order after filtering, so no sort occurs + outputOrder.applyTo(outHeader) // remember to apply it + + // Build the writer + val sort = { + if (loadFullReference) { + // If the full reference has been loaded, we need only sort the output if the order after filtering does not + // match the output order. + if (orderAfterFiltering == outputOrder) None else Some(outputOrder) + } + else { + // If the full reference has not been loaded, we will need to coordinate sort to reset + // the tags, then only re-sort in the output order if not in coordinate order. + if (outputOrder == SamOrder.Coordinate) None else Some(outputOrder) + } + } + sort.foreach(o => logger.info(f"Output will be sorted into $o order")) + val writer = SamWriter(output, outHeader, sort=sort, maxRecordsInRam=MaxRecordsInMemoryWhenSorting) + + // Create the final writer based on if the full reference has been loaded, or not + if (loadFullReference) { + new Writer[SamRecord] with Closeable { + override def write(rec: SamRecord): Unit = writer += regenerateNmUqMdTags(rec) + def close(): Unit = { + writer.close() + refCloseMethod() + } + } + } + else { + val progress = ProgressLogger(this.logger, "records", "sorted") + new Writer[SamRecord] with Closeable { + private val _sorter = Bams.sorter(order=SamOrder.Coordinate, header=header, maxRecordsInRam=MaxRecordsInMemoryWhenSorting) + override def write(rec: SamRecord): Unit = { + progress.record(rec) + this._sorter += rec + } + def close(): Unit = { + this._sorter.foreach { rec => writer += regenerateNmUqMdTags(rec) } + writer.close() + refCloseMethod() + } + } + } } /** diff --git a/src/main/scala/com/fulcrumgenomics/umi/GroupReadsByUmi.scala b/src/main/scala/com/fulcrumgenomics/umi/GroupReadsByUmi.scala index e4ee90b2b..aedf26037 100644 --- a/src/main/scala/com/fulcrumgenomics/umi/GroupReadsByUmi.scala +++ b/src/main/scala/com/fulcrumgenomics/umi/GroupReadsByUmi.scala @@ -27,7 +27,6 @@ package com.fulcrumgenomics.umi import java.util.concurrent.atomic.AtomicLong - import com.fulcrumgenomics.FgBioDef._ import com.fulcrumgenomics.bam.{Bams, Template} import com.fulcrumgenomics.bam.api.{SamOrder, SamRecord, SamSource, SamWriter} @@ -46,7 +45,10 @@ import scala.collection.BufferedIterator import scala.collection.immutable.IndexedSeq import scala.collection.mutable import scala.collection.mutable.ListBuffer +import com.fulcrumgenomics.commons.util.Threads.IterableThreadLocal +import com.fulcrumgenomics.FgBioDef._ +import java.util.concurrent.ForkJoinPool object GroupReadsByUmi { /** A type representing an actual UMI that is a string of DNA sequence. */ @@ -222,7 +224,7 @@ object GroupReadsByUmi { root.descendants.foreach(child => mappings += ((child.umi, id))) }) - mappings.result + mappings.result() } override def assign(rawUmis: Seq[Umi]): Map[Umi, MoleculeId] = { @@ -247,7 +249,7 @@ object GroupReadsByUmi { } } - assignIdsToNodes(roots.result) + assignIdsToNodes(roots.result()) } } @@ -462,8 +464,9 @@ class GroupReadsByUmi val minUmiLength: Option[Int] = None, @arg(flag='x', doc= """Allow read pairs with primary alignments on different contigs to be grouped when using the |paired assigner (otherwise filtered out).""") - val allowInterContig: Boolean = false -)extends FgBioTool with LazyLogging { + val allowInterContig: Boolean = false, + @arg(doc="The number of threads to use while grouping.") val threads: Int = 1 +) extends FgBioTool with LazyLogging { import GroupReadsByUmi._ require(this.minUmiLength.forall(_ => this.strategy != Strategy.Paired), "Paired strategy cannot be used with --min-umi-length") @@ -554,24 +557,41 @@ class GroupReadsByUmi val out = SamWriter(output, outHeader) val iterator = Bams.templateIterator(sorter.iterator, out.header, Bams.MaxInMemory, Io.tmpDir) - val tagFamilySizeCounter = new NumericCounter[Int]() - - while (iterator.hasNext) { - // Take the next set of templates by position and assign UMIs - val templates = takeNextGroup(iterator) - assignUmiGroups(templates) - - // Then output the records in the right order (assigned tag, read name, r1, r2) - val templatesByMi = templates.groupBy { t => t.r1.get[String](this.assignTag) } - templatesByMi.keys.toSeq.sortBy(id => (id.length, id)).foreach(tag => { - templatesByMi(tag).sortBy(t => t.name).flatMap(t => t.primaryReads).foreach(rec => { - out += rec - }) - }) + val tagFamilySizeCounters = new IterableThreadLocal(() => new NumericCounter[Int]()) + val pool = new ForkJoinPool(threads, ForkJoinPool.defaultForkJoinWorkerThreadFactory, null, true) + + Iterator.continually(if (iterator.hasNext) takeNextGroup(iterator) else Seq.empty) + .takeWhile(_.nonEmpty) + .grouped(25000) + .foreach { group => + group + .parWith(pool) + .map { templates: Seq[Template] => + // Take the next set of templates by position and assign UMIs + assignUmiGroups(templates) + + // Then output the records in the right order (assigned tag, read name, r1, r2) + val templatesByMi = templates.groupBy { t => t.r1.get[String](this.assignTag) } + + // Count up the family sizes + val tagFamilySizeCounter = tagFamilySizeCounters.get() + templatesByMi.values.foreach(ps => tagFamilySizeCounter.count(ps.size)) + + templatesByMi.keys.toSeq.sortBy(id => (id.length, id)).flatMap { tag => + templatesByMi(tag).sortBy(t => t.name).flatMap(t => t.primaryReads) + } + } + .seq + .iterator + .flatten + .foreach { rec => out += rec } + } - // Count up the family sizes - templatesByMi.values.foreach(ps => tagFamilySizeCounter.count(ps.size)) + // Gather the counters per thread + val tagFamilySizeCounter = new NumericCounter[Int]() + tagFamilySizeCounters.foreach { counter => + tagFamilySizeCounter += counter } out.close() @@ -669,6 +689,7 @@ class GroupReadsByUmi fail(s"Template ${t.name} has only one read, paired-reads required for paired strategy.") case (Some(r1), _, _) => r1[String](this.rawTag) + case _ => fail(f"Bug: template ${t.name}") } } }