Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[QUININE-11] Compute insert size distribution. #38

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/**
* Licensed to Big Data Genomics (BDG) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The BDG licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.bdgenomics.quinine.cli

import org.apache.spark.SparkContext
import org.bdgenomics.adam.rdd.ADAMContext._
import org.bdgenomics.quinine.metrics.insert.InsertSizeDistribution
import org.bdgenomics.utils.cli._
import org.kohsuke.args4j.{ Argument, Option => Args4jOption }

object Inserts extends BDGCommandCompanion {

val commandName = "insertSizeDistribution"
val commandDescription = "Computes the insert size distribution of a set of reads."

def apply(cmdLine: Array[String]): BDGCommand = {
new Inserts(Args4j[InsertsArgs](cmdLine))
}
}

class InsertsArgs extends Args4jBase {
@Argument(required = true,
metaVar = "READS",
usage = "The sequenced dataset.",
index = 0)
val readPath: String = null

@Args4jOption(required = false,
name = "-statPath",
usage = "File to write stats to. If omitted, writes to standard out.")
val statPath: String = null
}

class Inserts(val args: InsertsArgs) extends BDGSparkCommand[InsertsArgs] {

val companion = Inserts

def run(sc: SparkContext) {

// load in the reads
val reads = sc.loadAlignments(args.readPath)

// compute the insert size distribution
val insertSizeDistribution = InsertSizeDistribution(reads)

// write out the stats
StatWriter(insertSizeDistribution, args.statPath, sc)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ object QuinineMain extends Logging {
CompareADAM,
EstimateContamination,
FindReads,
Inserts,
PanelMetrics,
RNAMetrics)),
CommandGroup(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
/**
* Licensed to Big Data Genomics (BDG) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The BDG licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.bdgenomics.quinine.metrics.insert

import org.apache.spark.Logging
import org.apache.spark.rdd.RDD
import org.bdgenomics.adam.models.ReferenceRegion
import org.bdgenomics.formats.avro.AlignmentRecord

/**
* Helper object for computing the insert size distribution of reads sequenced
* using a paired end sequencing protocol.
*/
private[quinine] object InsertSizeDistribution extends Serializable with Logging {

/**
* @param rdd An RDD of reads.
* @return The insert size distribution across an RDD of reads.
*/
def apply(rdd: RDD[AlignmentRecord]): InsertSizeDistribution = {
InsertSizeDistribution(rdd.flatMap(insertSize)
.countByValue
.toMap)
}

/**
* Computes the insert size of a single read.
*
* Computes the insert size of a read, as is used in the insert size
* aggregator. This applies several filters before computing the insert
* size:
*
* 1. The read must be paired, and be the first read from the pair.
* 2. The read must be mapped, and it's mate must also be mapped.
* 3. The two reads must be mapped to the same contig.
*
* These filters are necessary in order to compute the total insert size
* distribution across all reads without duplicating reads from a given
* fragment.
*
* @param read The read to compute the insert size of.
* @return Optionally returns the insert size.
*/
private[insert] def insertSize(read: AlignmentRecord): Option[Long] = {
// we only return an insert size if:
// 1. the read is paired
// 2. the read is the first read from the pair
// 3. the read is mapped
// 4. the mapping is a primary alignment
// 5. the read's mate is mapped
// 6. the read and it's pair are mapped to the same contig
try {
if (read.getReadPaired &&
read.getReadInFragment == 0 &&
read.getReadMapped &&
read.getPrimaryAlignment &&
read.getMateMapped &&
read.getContig.getContigName == read.getMateContig.getContigName) {
ReferenceRegion(read).distance(ReferenceRegion(read.getMateContig.getContigName,
read.getMateAlignmentStart,
read.getMateAlignmentEnd))
} else {
None
}
} catch {
case e: Throwable => {
log.warn("Caught exception %s when processing read %s. Ignoring.".format(
e, read))
None
}
}
}
}

/**
* @param insertSizes A map between insert sizes and the number of times that
* a read pair was seen with that insert size.
*/
case class InsertSizeDistribution(insertSizes: Map[Long, Long]) {

/**
* @return Returns the number of fragments we observed.
*/
private[insert] def numFragments(): Long = insertSizes.values.sum

/**
* @return Returns the mean insert size.
*/
def mean(): Double = {
insertSizes.map(kv => kv._1 * kv._2)
.sum
.toDouble / numFragments().toDouble
}

override def toString: String = {
val distribution = insertSizes.toSeq
.sortBy(_._1)
.map(kv => "%d: %d".format(kv._1, kv._2))
.mkString("\n")

"Mean insert size: %3.2f\nInsert size distribution:\n%s".format(mean,
distribution)
}
}
Loading