diff --git a/quinine-cli/src/main/scala/org/bdgenomics/quinine/cli/Inserts.scala b/quinine-cli/src/main/scala/org/bdgenomics/quinine/cli/Inserts.scala new file mode 100644 index 0000000..7fe46b2 --- /dev/null +++ b/quinine-cli/src/main/scala/org/bdgenomics/quinine/cli/Inserts.scala @@ -0,0 +1,64 @@ +/** + * Licensed to Big Data Genomics (BDG) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The BDG licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.bdgenomics.quinine.cli + +import org.apache.spark.SparkContext +import org.bdgenomics.adam.rdd.ADAMContext._ +import org.bdgenomics.quinine.metrics.insert.InsertSizeDistribution +import org.bdgenomics.utils.cli._ +import org.kohsuke.args4j.{ Argument, Option => Args4jOption } + +object Inserts extends BDGCommandCompanion { + + val commandName = "insertSizeDistribution" + val commandDescription = "Computes the insert size distribution of a set of reads." + + def apply(cmdLine: Array[String]): BDGCommand = { + new Inserts(Args4j[InsertsArgs](cmdLine)) + } +} + +class InsertsArgs extends Args4jBase { + @Argument(required = true, + metaVar = "READS", + usage = "The sequenced dataset.", + index = 0) + val readPath: String = null + + @Args4jOption(required = false, + name = "-statPath", + usage = "File to write stats to. If omitted, writes to standard out.") + val statPath: String = null +} + +class Inserts(val args: InsertsArgs) extends BDGSparkCommand[InsertsArgs] { + + val companion = Inserts + + def run(sc: SparkContext) { + + // load in the reads + val reads = sc.loadAlignments(args.readPath) + + // compute the insert size distribution + val insertSizeDistribution = InsertSizeDistribution(reads) + + // write out the stats + StatWriter(insertSizeDistribution, args.statPath, sc) + } +} diff --git a/quinine-cli/src/main/scala/org/bdgenomics/quinine/cli/QuinineMain.scala b/quinine-cli/src/main/scala/org/bdgenomics/quinine/cli/QuinineMain.scala index ae96443..a191624 100644 --- a/quinine-cli/src/main/scala/org/bdgenomics/quinine/cli/QuinineMain.scala +++ b/quinine-cli/src/main/scala/org/bdgenomics/quinine/cli/QuinineMain.scala @@ -34,6 +34,7 @@ object QuinineMain extends Logging { CompareADAM, EstimateContamination, FindReads, + Inserts, PanelMetrics, RNAMetrics)), CommandGroup( diff --git a/quinine-core/src/main/scala/org/bdgenomics/quinine/metrics/insert/InsertSizeDistribution.scala b/quinine-core/src/main/scala/org/bdgenomics/quinine/metrics/insert/InsertSizeDistribution.scala new file mode 100644 index 0000000..2b12121 --- /dev/null +++ b/quinine-core/src/main/scala/org/bdgenomics/quinine/metrics/insert/InsertSizeDistribution.scala @@ -0,0 +1,119 @@ +/** + * Licensed to Big Data Genomics (BDG) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The BDG licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.bdgenomics.quinine.metrics.insert + +import org.apache.spark.Logging +import org.apache.spark.rdd.RDD +import org.bdgenomics.adam.models.ReferenceRegion +import org.bdgenomics.formats.avro.AlignmentRecord + +/** + * Helper object for computing the insert size distribution of reads sequenced + * using a paired end sequencing protocol. + */ +private[quinine] object InsertSizeDistribution extends Serializable with Logging { + + /** + * @param rdd An RDD of reads. + * @return The insert size distribution across an RDD of reads. + */ + def apply(rdd: RDD[AlignmentRecord]): InsertSizeDistribution = { + InsertSizeDistribution(rdd.flatMap(insertSize) + .countByValue + .toMap) + } + + /** + * Computes the insert size of a single read. + * + * Computes the insert size of a read, as is used in the insert size + * aggregator. This applies several filters before computing the insert + * size: + * + * 1. The read must be paired, and be the first read from the pair. + * 2. The read must be mapped, and it's mate must also be mapped. + * 3. The two reads must be mapped to the same contig. + * + * These filters are necessary in order to compute the total insert size + * distribution across all reads without duplicating reads from a given + * fragment. + * + * @param read The read to compute the insert size of. + * @return Optionally returns the insert size. + */ + private[insert] def insertSize(read: AlignmentRecord): Option[Long] = { + // we only return an insert size if: + // 1. the read is paired + // 2. the read is the first read from the pair + // 3. the read is mapped + // 4. the mapping is a primary alignment + // 5. the read's mate is mapped + // 6. the read and it's pair are mapped to the same contig + try { + if (read.getReadPaired && + read.getReadInFragment == 0 && + read.getReadMapped && + read.getPrimaryAlignment && + read.getMateMapped && + read.getContig.getContigName == read.getMateContig.getContigName) { + ReferenceRegion(read).distance(ReferenceRegion(read.getMateContig.getContigName, + read.getMateAlignmentStart, + read.getMateAlignmentEnd)) + } else { + None + } + } catch { + case e: Throwable => { + log.warn("Caught exception %s when processing read %s. Ignoring.".format( + e, read)) + None + } + } + } +} + +/** + * @param insertSizes A map between insert sizes and the number of times that + * a read pair was seen with that insert size. + */ +case class InsertSizeDistribution(insertSizes: Map[Long, Long]) { + + /** + * @return Returns the number of fragments we observed. + */ + private[insert] def numFragments(): Long = insertSizes.values.sum + + /** + * @return Returns the mean insert size. + */ + def mean(): Double = { + insertSizes.map(kv => kv._1 * kv._2) + .sum + .toDouble / numFragments().toDouble + } + + override def toString: String = { + val distribution = insertSizes.toSeq + .sortBy(_._1) + .map(kv => "%d: %d".format(kv._1, kv._2)) + .mkString("\n") + + "Mean insert size: %3.2f\nInsert size distribution:\n%s".format(mean, + distribution) + } +} diff --git a/quinine-core/src/test/scala/org/bdgenomics/quinine/metrics/insert/InsertSizeDistributionSuite.scala b/quinine-core/src/test/scala/org/bdgenomics/quinine/metrics/insert/InsertSizeDistributionSuite.scala new file mode 100644 index 0000000..f230cc5 --- /dev/null +++ b/quinine-core/src/test/scala/org/bdgenomics/quinine/metrics/insert/InsertSizeDistributionSuite.scala @@ -0,0 +1,224 @@ +/** + * Licensed to Big Data Genomics (BDG) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The BDG licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.bdgenomics.quinine.metrics.insert + +import org.bdgenomics.adam.util.ADAMFunSuite +import org.bdgenomics.formats.avro.{ AlignmentRecord, Contig } + +class InsertSizeDistributionSuite extends ADAMFunSuite { + + val ctg1 = Contig.newBuilder() + .setContigName("ctg1") + .build() + + val ctg2 = Contig.newBuilder() + .setContigName("ctg2") + .build() + + val unpaired = AlignmentRecord.newBuilder() + .setReadPaired(false) + .build() + + val secondOfPair = AlignmentRecord.newBuilder() + .setReadPaired(true) + .setReadInFragment(1) + .setReadMapped(true) + .setPrimaryAlignment(true) + .setContig(ctg1) + .setStart(10L) + .setEnd(50L) + .setMateMapped(true) + .setMateContig(ctg1) + .setMateAlignmentStart(100L) + .setMateAlignmentEnd(150L) + .build() + + val unmappedFirstOfPair = AlignmentRecord.newBuilder() + .setReadPaired(true) + .setReadInFragment(0) + .setReadMapped(false) + .setMateMapped(true) + .setMateContig(ctg1) + .setMateAlignmentStart(100L) + .setMateAlignmentEnd(150L) + .build() + + val firstOfPairMateUnmapped = AlignmentRecord.newBuilder() + .setReadPaired(true) + .setReadInFragment(0) + .setReadMapped(true) + .setPrimaryAlignment(true) + .setContig(ctg1) + .setStart(100L) + .setEnd(150L) + .setMateMapped(false) + .build() + + val secondaryFirstOfPair = AlignmentRecord.newBuilder() + .setReadPaired(true) + .setReadInFragment(0) + .setReadMapped(true) + .setPrimaryAlignment(false) + .setContig(ctg1) + .setStart(10L) + .setEnd(50L) + .setMateMapped(true) + .setMateContig(ctg1) + .setMateAlignmentStart(100L) + .setMateAlignmentEnd(150L) + .build() + + val chimericFragmentFirstOfPair = AlignmentRecord.newBuilder() + .setReadPaired(true) + .setReadInFragment(0) + .setReadMapped(true) + .setPrimaryAlignment(true) + .setContig(ctg1) + .setStart(10L) + .setEnd(50L) + .setMateMapped(true) + .setMateContig(ctg2) + .setMateAlignmentStart(100L) + .setMateAlignmentEnd(150L) + .build() + + val firstBeforeSecond = AlignmentRecord.newBuilder() + .setReadPaired(true) + .setReadInFragment(0) + .setReadMapped(true) + .setPrimaryAlignment(true) + .setContig(ctg1) + .setStart(10L) + .setEnd(51L) + .setMateMapped(true) + .setMateContig(ctg1) + .setMateAlignmentStart(100L) + .setMateAlignmentEnd(151L) + .build() + + val firstAfterSecond = AlignmentRecord.newBuilder() + .setReadPaired(true) + .setReadInFragment(0) + .setReadMapped(true) + .setPrimaryAlignment(true) + .setContig(ctg1) + .setStart(250L) + .setEnd(301L) + .setMateMapped(true) + .setMateContig(ctg1) + .setMateAlignmentStart(100L) + .setMateAlignmentEnd(151L) + .build() + + val badRead = AlignmentRecord.newBuilder() + .setReadPaired(true) + .setReadInFragment(0) + .setReadMapped(true) + .setPrimaryAlignment(true) + .setMateMapped(true) + .setMateContig(ctg1) + .setMateAlignmentStart(100L) + .setMateAlignmentEnd(151L) + .build() + + val badMate = AlignmentRecord.newBuilder() + .setReadPaired(true) + .setReadInFragment(0) + .setReadMapped(true) + .setPrimaryAlignment(true) + .setContig(ctg1) + .setStart(250L) + .setEnd(301L) + .setMateMapped(true) + .build() + + test("unpaired read should not return an insert size") { + val insert = InsertSizeDistribution.insertSize(unpaired) + assert(insert.isEmpty) + } + + test("second of pair read should not return an insert size") { + val insert = InsertSizeDistribution.insertSize(secondOfPair) + assert(insert.isEmpty) + } + + test("unmapped first of pair read should not return an insert size") { + val insert = InsertSizeDistribution.insertSize(unmappedFirstOfPair) + assert(insert.isEmpty) + } + + test("first of pair read with unmapped mate should not return an insert size") { + val insert = InsertSizeDistribution.insertSize(firstOfPairMateUnmapped) + assert(insert.isEmpty) + } + + test("first of pair read with non-primary alignment should not return an insert size") { + val insert = InsertSizeDistribution.insertSize(secondaryFirstOfPair) + assert(insert.isEmpty) + } + + test("first of pair read from chimeric fragment should not return an insert size") { + val insert = InsertSizeDistribution.insertSize(chimericFragmentFirstOfPair) + assert(insert.isEmpty) + } + + test("first of pair before second of pair") { + val insert = InsertSizeDistribution.insertSize(firstBeforeSecond) + assert(insert.isDefined) + assert(insert.get === 50L) + } + + test("first of pair after second of pair") { + val insert = InsertSizeDistribution.insertSize(firstAfterSecond) + assert(insert.isDefined) + assert(insert.get === 100L) + } + + test("malformed read should be ignored") { + val insert = InsertSizeDistribution.insertSize(badRead) + assert(insert.isEmpty) + } + + test("malformed mate should be ignored") { + val insert = InsertSizeDistribution.insertSize(badMate) + assert(insert.isEmpty) + } + + sparkTest("aggregate across reads") { + val rdd = sc.parallelize(Seq(unpaired, + secondOfPair, + unmappedFirstOfPair, + firstOfPairMateUnmapped, + secondaryFirstOfPair, + chimericFragmentFirstOfPair, + firstBeforeSecond, + firstAfterSecond, + firstAfterSecond, + firstAfterSecond, + firstAfterSecond, + badRead, + badMate)) + val distribution = InsertSizeDistribution(rdd) + assert(distribution.insertSizes.size === 2) + assert(distribution.insertSizes(50L) === 1L) + assert(distribution.insertSizes(100L) === 4L) + assert(distribution.numFragments() === 5L) + val mean = distribution.mean + assert(mean > 89.999999 && mean < 90.000001) + } +}