Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Yf coverage unifier #891

Open
wants to merge 16 commits into
base: main
Choose a base branch
from
24 changes: 24 additions & 0 deletions src/main/scala/com/fulcrumgenomics/bam/Bams.scala
Original file line number Diff line number Diff line change
Expand Up @@ -366,6 +366,30 @@ object Bams extends LazyLogging {
new SelfClosingIterator(_iterator, () => queryIterator.close())
}

/** Return an iterator over records sorted and grouped into [[Template]] objects. Sort order is deterministic, but
* random (based on queryname). See [[templateIterator]] for a [[Template]] iterator which emits templates in a
* non-guaranteed sort order.
*
* @see [[templateIterator]]
* @param iterator an iterator from which to consume records
* @param header the header associated with the records
* @param maxInMemory the maximum number of records to keep and sort in memory, if sorting is needed
* @param tmpDir a temp directory to use for temporary sorting files if sorting is needed
* @return an Iterator of queryname sorted Template objects
*/
def templateRandomIterator(iterator: Iterator[SamRecord],
header: SAMFileHeader,
maxInMemory: Int,
tmpDir:DirPath=Io.tmpDir): SelfClosingIterator[Template] = {

val randomSeed = 42
val sortedIterator = RandomizeBam.Randomize(iterator, header, randomSeed, maxInMemory, queryGroup = true, tmpDir).iterator

val sortedHead = SamOrder.RandomQuery.applyTo(header.clone())

Bams.templateIterator(sortedIterator, sortedHead, maxInMemory, tmpDir)
}

/** Returns an iterator over the records in the given iterator such that the order of the records returned is
* determined by the value of the given SAM tag, which can optionally be transformed.
*
Expand Down
60 changes: 35 additions & 25 deletions src/main/scala/com/fulcrumgenomics/bam/RandomizeBam.scala
Original file line number Diff line number Diff line change
Expand Up @@ -28,59 +28,69 @@ package com.fulcrumgenomics.bam
* Created by tfenne on 6/23/16.
*/

// import com.fulcrumgenomics.FgBioDef._


import com.fulcrumgenomics.FgBioDef._
import com.fulcrumgenomics.bam.RandomizeBam.Randomize
import com.fulcrumgenomics.bam.api._
import com.fulcrumgenomics.cmdline.{ClpGroups, FgBioTool}
import com.fulcrumgenomics.commons.util.LazyLogging
import com.fulcrumgenomics.commons.util.{LazyLogging, Logger}
import com.fulcrumgenomics.sopt.{arg, clp}
import com.fulcrumgenomics.util.{Io, ProgressLogger, Sorter}
import htsjdk.samtools.SAMFileHeader
import htsjdk.samtools.SAMFileHeader.{GroupOrder, SortOrder}
import htsjdk.samtools.util.Murmur3

@clp(group=ClpGroups.SamOrBam, description=
"""
|Randomizes the order of reads in a SAM or BAM file. Randomization is done by sorting
|on a hash of the `queryname` (and bases and quals if not query-grouping). By default
|reads with the same query name are grouped together in the output file; this can be
|turned off by specifying --query-group=false.
""")
class RandomizeBam(
@arg(flag='i', doc="The input SAM or BAM file.") val input: PathToBam = Io.StdIn,
@arg(flag='o', doc="The output SAM or BAM file.") val output: PathToBam = Io.StdOut,
@arg(flag='s', doc="Random seed.") val seed: Int = 42,
@arg(flag='q', doc="Group together reads by queryname.") val queryGroup: Boolean = true
) extends FgBioTool with LazyLogging {
object RandomizeBam {

Io.assertCanWriteFile(input)
Io.assertCanWriteFile(output)

override def execute(): Unit = {
val in = SamSource(input)
def Randomize(in: Iterator[SamRecord], header: SAMFileHeader, seed: Int, maxNumInRam:Int, queryGroup: Boolean, tmpDir:DirPath= Io.tmpDir): Sorter[SamRecord, _] = {
val hasher = new Murmur3(seed)

val logger: Logger = new Logger(this.getClass)
val sorter = {
if (queryGroup) {
val f = (r: SamRecord) => SamOrder.RandomQueryKey(hasher.hashUnencodedChars(r.name), r.name, r.asSam.getFlags)
new Sorter(2e6.toInt, new SamRecordCodec(in.header), f)
new Sorter(maxNumInRam, new SamRecordCodec(header), f, tmpDir)
}
else {
val f = (r: SamRecord) => {
val key = s"${r.name}:${r.basesString}:${r.qualsString}"
SamOrder.RandomKey(hasher.hashUnencodedChars(key), r.asSam.getFlags)
}
new Sorter(2e6.toInt, new SamRecordCodec(in.header), f)
new Sorter(maxNumInRam, new SamRecordCodec(header), f, tmpDir)
}
}

logger.info("Sorting reads into random order.")
val sortProgress = ProgressLogger(logger, verb="sorted", unit=5e6.toInt)
val sortProgress = ProgressLogger(logger, verb = "sorted", unit = 5e6.toInt)
in.foreach(rec => {
sorter += rec
sortProgress.record()
})
sorter
}
}


@clp(group = ClpGroups.SamOrBam, description =
"""
|Randomizes the order of reads in a SAM or BAM file. Randomization is done by sorting
|on a hash of the `queryname` (and bases and quals and flags if not query-grouping). By default
|reads with the same query name are grouped together in the output file; this can be
|turned off by specifying --query-group=false.
""")
class RandomizeBam(
@arg(flag = 'i', doc = "The input SAM or BAM file.") val input: PathToBam = Io.StdIn,
@arg(flag = 'o', doc = "The output SAM or BAM file.") val output: PathToBam = Io.StdOut,
@arg(flag = 's', doc = "Random seed.") val seed: Int = 42,
@arg(flag = 'q', doc = "Group together reads by queryname.") val queryGroup: Boolean = true
) extends FgBioTool with LazyLogging {

Io.assertReadable(input)
Io.assertCanWriteFile(output)

override def execute(): Unit = {
val in = SamSource(input)

val sorter = Randomize(in.iterator, in.header, seed, 2e6.toInt, queryGroup)

logger.info("Writing reads out to output.")
val outHeader = in.header.clone()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
/*
* The MIT License
*
* Copyright (c) 2022 Fulcrum Genomics
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in
* all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
* THE SOFTWARE.
*
*/

package com.fulcrumgenomics.personal.yfarjoun

import com.fulcrumgenomics.FgBioDef.javaIterableToIterator
import com.fulcrumgenomics.bam.Template
import com.fulcrumgenomics.bam.api.SamRecord
import com.fulcrumgenomics.personal.yfarjoun.CoverageManager.readFilterForCoverage
import com.fulcrumgenomics.util.NumericTypes.PhredScore
import htsjdk.samtools.SAMRecord
import htsjdk.samtools.util.{Interval, IntervalList, Locatable, OverlapDetector}

import scala.jdk.CollectionConverters.{IteratorHasAsJava, IteratorHasAsScala}

object CoverageManager {

/**
* a Filter for reads determining if they count towards coverage.
*
* This filter examins the reads flags and mapping quality. It will pass (i.e. return true)
* primary and supplementary (non-secondary) non-duplicate reads that are mapped (with mapping quality >= input minMapQ,)
* and pass PF.
*
* @param record Input record to filter
* @param minMapQ minimum mapping quality for passing reads
* @return true if record is mapped with mapQ>=minMapQ, non-duplicate, pf, and non-secondary.
*/
def readFilterForCoverage(record: SamRecord, minMapQ: PhredScore): Boolean = {
!record.secondary &&
record.mapped &&
!record.duplicate &&
record.pf &&
record.mapq >= minMapQ
}

/**
* Takes an iterator of Intervals and reduces it to a *non-overlapping* Seq of the same.
*
* Result will cover the same original territory, and are guaranteed to not have overlapping Intervals.
*/
def squish(ls: IterableOnce[Interval]): IterableOnce[Interval] = {
new IntervalList.IntervalMergerIterator(ls.iterator.asJava, true, false, false)
}.asScala
}

/**
* Class for managing coverage over an interval list.
*
* Holds an IntervalList of interest, a target coverage value and a minimum mapping quality for reads to be considered
* as providing coverage.
*
*/
class CoverageManager(val intervals: IntervalList, val minMapQ: PhredScore, val coverageTarget: Int) {

private val coverage: OverlapDetector[LocusTrack] = new OverlapDetector[LocusTrack](0, 0)

intervals.uniqued().foreach(i => coverage.addLhs(new LocusTrack(i), i))

private def getCoverageTracks(locus: Locatable): Iterator[LocusTrack] = {
coverage.getOverlaps(locus).map(lt => lt.sliceToLocus(locus))
}

/**
* Determines if a locus contains overlaps a region that needs coverage.
*/
def needsCoverage(locus: Locatable): Boolean = {
getCoverageTracks(locus).exists(lt => lt.track.exists(_ < coverageTarget))
}

/**
* Increments the coverage map over an interval.
* Will not increment beyond Short.MaxValue to avoid overflow
*/
private[yfarjoun] def incrementCoverage(locatable: Locatable): Unit = {
getCoverageTracks(locatable).foreach(locusTrack =>
locusTrack.track.indices
.foreach(j => locusTrack.track(j) = Math.min(Short.MaxValue, locusTrack.track(j) + 1).toShort))
}

/** for testing */
private[yfarjoun] def resetCoverage(): Unit = {
coverage.getAll.foreach(locusTrack =>
locusTrack.track.indices
.foreach(j => locusTrack.track.update(j, 0.toShort)))
}

/**
* Filters, an Iterator[Template] while recording the resulting coverage
* **with side-effects** as follows:
*
* 1. Only templates that have reads that count towards coverage, AND are needed in order to hit the target coverage
* pass the filters.
* 2. coverage over the overlapping region is incremented (only by 1 if overlapping a read which counts towards coverage)
* 3. templates are returned in their entirety, or not at all. no partial Templates are returned.
* 4. Any template which is returned contains at least one record whose coverage was used towards the target.
* 5. returns a Seq rather than an Iterator so that the side-effects will happen for sure.
*
* @param templateIterator input Template Iterator
* @return filtered and processed templates.
*/
def processTemplates(templateIterator: Iterator[Template]): Seq[Template] = {
templateIterator
// Only consider templates that contain reads which cover regions
// that need coverage, i.e. that at least one base that they cover is less
// than the target coverage
.filter(t => {
val coverageReads: Seq[SAMRecord] = t.allReads.filter(readFilterForCoverage(_, minMapQ)).map(_.asSam).toSeq
val templateNeeded = coverageReads.exists(needsCoverage(_))
if (templateNeeded) coverageReads.foreach(incrementCoverage(_))
templateNeeded
}).toSeq
}
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
/*
* The MIT License
*
* Copyright (c) 2022 Fulcrum Genomics
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in
* all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
* THE SOFTWARE.
*
*/
package com.fulcrumgenomics.personal.yfarjoun

import htsjdk.samtools.util.{Interval, Locatable}
import scala.collection.mutable

object LocusTrack {
val Empty = new LocusTrack(None)

/**
* A small class which implements a "connected slice", i.e. a slice of a mutable.Seq which, when modified
* simply modifies its "parent" Seq. Unlike Seq.slice and Array.slice, copies of data are not made.
* @param mySeq original Seq[A] of which this is a slice
* @param from index from which slice starts (0-based, inclusive.) Must be >=0
* @param to index to which slice continues (0-based, exclusive). Must be >= from
* @tparam A Type of Seq
*/
class ConnectedSlice[A](val mySeq: mutable.Seq[A], val from: Int, val to: Int) extends mutable.Seq[A] {
assert(from >= 0)
assert(to >= from)
assert(to <= mySeq.size)

/**
* Updates element idx in slice to value elem. Will (only) update appropriate value in origSeq
* @param idx index of element in the slice to update
* @param elem new value of element
*/
@throws[IndexOutOfBoundsException]
override def update(idx: Int, elem: A): Unit = {
if (idx < 0 || idx >= length) {
throw new IndexOutOfBoundsException(idx)
}
mySeq.update(idx + from, elem)
}

/** Get the element at the specified index. */
@throws[IndexOutOfBoundsException]
override def apply(i: Int): A = {
if (i < 0 || i >= length) {
throw new IndexOutOfBoundsException(i)
}
mySeq.apply(i + from)
}

override val length: Int = to - from

/** Provides an iterator over the elements between from and to in mySeq.
* */
override def iterator: Iterator[A] = mySeq.slice(from, to).iterator
}
}

class LocusTrack(val locus: Option[Interval], val track: mutable.Seq[Short]) {
def this(l: Option[Interval]) = {
this(l, l match {
case None => mutable.Seq.empty
case Some(loc) => new Array[Short](loc.length())
})
}

def this(l: Interval) = {
this(Some(l))
}

/**
* return a slice of the track which matches a region overlapping a given locus
*/
def sliceToLocus(l: Locatable): LocusTrack = {
locus match {
case None => LocusTrack.Empty
case Some(loc) if !loc.contigsMatch(l) => LocusTrack.Empty
case Some(loc) =>
val start: Int = Math.max(loc.getStart, l.getStart)
val end: Int = Math.min(loc.getEnd, l.getEnd)
if (start > end) {
LocusTrack.Empty
} else {
val overlapLocus = new Interval(loc.getContig, start, end)
val overlapTrack = new LocusTrack.ConnectedSlice(track, start - loc.getStart, end + 1 - loc.getStart)
new LocusTrack(Some(overlapLocus), overlapTrack)
}
}
}
}
Loading