From d91fef41cb549c30b14404bedb81b58f2076f964 Mon Sep 17 00:00:00 2001 From: "Andrew (Drew) Titus" Date: Tue, 20 Aug 2024 14:12:09 -0400 Subject: [PATCH] [0.2.0] add alignment_score() function to Python API --- README.md | 27 ++++++++- pyproject.toml | 3 +- rust/src/lib.rs | 42 ++++++++++++++ src/sequence_align/pairwise.py | 88 ++++++++++++++++++++++++------ tests/unit/test_alignment_score.py | 46 ++++++++++++++++ 5 files changed, 187 insertions(+), 19 deletions(-) create mode 100644 tests/unit/test_alignment_score.py diff --git a/README.md b/README.md index b241858..11da52d 100644 --- a/README.md +++ b/README.md @@ -33,10 +33,13 @@ where `M` and `N` are the lengths of the two sequences being aligned. Hirschberg to have the same time complexity (`O(M*N)`), but only use `O(min{M, N})` space, making it an appealing option for memory-limited applications or extremely large sequences. +One may also compute the Needleman-Wunsch alignment score for alignments produced by either algorithm +using [sequence_align.pairwise.alignment_score](src/sequence_align/pairwise.py). + Using these algorithms is straightforward: ``` python -from sequence_align.pairwise import hirschberg, needleman_wunsch +from sequence_align.pairwise import alignment_score, hirschberg, needleman_wunsch # See https://en.wikipedia.org/wiki/Needleman%E2%80%93Wunsch_algorithm#/media/File:Needleman-Wunsch_pairwise_sequence_alignment.png @@ -59,6 +62,17 @@ print(aligned_seq_a) # Expects ["G", "C", "A", "_", "T", "G", "C", "G"] print(aligned_seq_b) +# Expects 0 +score = alignment_score( + aligned_seq_a, + aligned_seq_b, + match_score=1.0, + mismatch_score=-1.0, + indel_score=-1.0, + gap="_", +) +print(score) + # See https://en.wikipedia.org/wiki/Hirschberg%27s_algorithm#Example seq_a = ["A", "G", "T", "A", "C", "G", "C", "A"] @@ -78,6 +92,17 @@ print(aligned_seq_a) # Expects ["_", "_", "T", "A", "T", "G", "C", "_"] print(aligned_seq_b) + +# Expects 1 +score = alignment_score( + aligned_seq_a, + aligned_seq_b, + match_score=2.0, + mismatch_score=-1.0, + indel_score=-2.0, + gap="_", +) +print(score) ``` ## Performance Benchmarks diff --git a/pyproject.toml b/pyproject.toml index 4a15e72..4bb5fd6 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "maturin" [project] name = "sequence_align" -version = "0.1.2" +version = "0.2.0" description = "Efficient implementations of Needleman-Wunsch and other sequence alignment algorithms in Rust with Python bindings." readme = "README.md" requires-python = ">=3.7" @@ -47,6 +47,7 @@ dev = [ "pylint", "pytest", "pytest-cov", + "pytest-subtests", "pyyaml", "types-psutil", "types-pyyaml", diff --git a/rust/src/lib.rs b/rust/src/lib.rs index 8cfcc27..bcf6561 100644 --- a/rust/src/lib.rs +++ b/rust/src/lib.rs @@ -424,10 +424,52 @@ pub fn hirschberg( Ok((aligned_seq_one, aligned_seq_two)) } +/// Compute the alignment score for the given pair of aligned sequences. +/// +/// # Notes +/// The sequences must be of equal length. +/// +/// # Complexity +/// This takes O(n) time and O(1) space complexity, where n is the length of the sequence. +/// +/// # References +/// https://en.wikipedia.org/wiki/Needleman%E2%80%93Wunsch_algorithm +#[pyfunction] +#[pyo3(signature = (seq_one, seq_two, match_score=1.0, mismatch_score=-1.0, indel_score=-1.0, gap_val=-1))] +pub fn alignment_score( + seq_one: Vec, + seq_two: Vec, + match_score: f64, + mismatch_score: f64, + indel_score: f64, + gap_val: i64, +) -> PyResult { + let seq_one_len = seq_one.len(); + let seq_two_len = seq_two.len(); + if seq_one_len != seq_two_len { + return Err(PyValueError::new_err( + "Sequence lengths must match! Make sure to align the sequences before calling alignment_score()." + )); + } + + let mut score = 0.0; + for seq_idx in 0..seq_one_len { + if seq_one[seq_idx] == seq_two[seq_idx] { + score += match_score; + } else if (seq_one[seq_idx] == gap_val) || (seq_two[seq_idx] == gap_val) { + score += indel_score; + } else { + score += mismatch_score; + } + } + Ok(score) +} + /// A Python module implemented in Rust. #[pymodule] fn _sequence_align(_py: Python, m: &PyModule) -> PyResult<()> { m.add_function(wrap_pyfunction!(needleman_wunsch, m)?)?; m.add_function(wrap_pyfunction!(hirschberg, m)?)?; + m.add_function(wrap_pyfunction!(alignment_score, m)?)?; Ok(()) } diff --git a/src/sequence_align/pairwise.py b/src/sequence_align/pairwise.py index fc327a0..a1bbb1e 100644 --- a/src/sequence_align/pairwise.py +++ b/src/sequence_align/pairwise.py @@ -1,5 +1,5 @@ # Copyright 2023-present Kensho Technologies, LLC. -from typing import List, Sequence, Tuple +from typing import Dict, List, Sequence, Tuple from sequence_align import _sequence_align # type: ignore @@ -11,12 +11,19 @@ def _entry2idx( seq_a: Sequence[str], seq_b: Sequence[str], gap: str, -) -> Tuple[List[str], List[int], List[int]]: - idx2symbol = sorted(set(seq_a).union(set(seq_b))) - symbol2idx = {idx2symbol[idx]: idx for idx in range(len(idx2symbol))} - if gap in symbol2idx: + allow_gap: bool = False, +) -> Tuple[Dict[int, str], List[int], List[int]]: + symbols = set(seq_a).union(set(seq_b)) + if not allow_gap and gap in symbols: raise ValueError(f'Gap entry "{gap}" found in seq_a and/or seq_b; must not exist in either') + symbols_without_gap = symbols - {gap} + idx2symbol: Dict[int, str] = { + _GAP_VAL: gap, + **{idx: symbol for idx, symbol in enumerate(sorted(symbols_without_gap))}, + } + symbol2idx = {symbol: idx for idx, symbol in idx2symbol.items()} + seq_a_indices = [symbol2idx[symbol] for symbol in seq_a] seq_b_indices = [symbol2idx[symbol] for symbol in seq_b] @@ -24,7 +31,7 @@ def _entry2idx( def _idx2entry( - idx2symbol: List[str], + idx2symbol: Dict[int, str], seq_a_indices_aligned: List[int], seq_b_indices_aligned: List[int], gap: str, @@ -46,7 +53,7 @@ def needleman_wunsch( Args: seq_a: First sequence in pair to align. - seq_b: Second sequence in paior to align. + seq_b: Second sequence in pair to align. match_score: Score to apply for transitions where the sequences match each other at a given index. Defaults to 1. mismatch_score: Score to apply for transitions where the sequences do _not_ match each other @@ -82,10 +89,10 @@ def needleman_wunsch( seq_a_indices_aligned, seq_b_indices_aligned = _sequence_align.needleman_wunsch( seq_a_indices, seq_b_indices, - match_score, - mismatch_score, - indel_score, - _GAP_VAL, + match_score=match_score, + mismatch_score=mismatch_score, + indel_score=indel_score, + gap_val=_GAP_VAL, ) # Finally, map back and return @@ -104,7 +111,7 @@ def hirschberg( Args: seq_a: First sequence in pair to align. - seq_b: Second sequence in paior to align. + seq_b: Second sequence in pair to align. match_score: Score to apply for transitions where the sequences match each other at a given index. Defaults to 1. mismatch_score: Score to apply for transitions where the sequences do _not_ match each other @@ -139,17 +146,64 @@ def hirschberg( See https://en.wikipedia.org/wiki/Hirschberg%27s_algorithm for more information. """ # First, map the sequences to integers - idx2symbol, seq_a_indices, seq_b_indices = _entry2idx(seq_a, seq_b, gap) + idx2symbol, seq_a_indices, seq_b_indices = _entry2idx(seq_a, seq_b, gap=gap) # Now, run alignment in Rust seq_a_indices_aligned, seq_b_indices_aligned = _sequence_align.hirschberg( seq_a_indices, seq_b_indices, - match_score, - mismatch_score, - indel_score, - _GAP_VAL, + match_score=match_score, + mismatch_score=mismatch_score, + indel_score=indel_score, + gap_val=_GAP_VAL, ) # Finally, map back and return return _idx2entry(idx2symbol, seq_a_indices_aligned, seq_b_indices_aligned, gap) + + +def alignment_score( + aligned_seq_a: Sequence[str], + aligned_seq_b: Sequence[str], + match_score: float = 1.0, + mismatch_score: float = -1.0, + indel_score: float = -1.0, + gap: str = "-", +) -> float: + """Compute the alignment score for the pair of sequences. + + Args: + aligned_seq_a: First aligned sequence. + aligned_seq_b: Second aligned sequence. + match_score: Score to apply for transitions where the sequences match each other at a given + index. Defaults to 1. + mismatch_score: Score to apply for transitions where the sequences do _not_ match each other + at a given index. Defaults to -1. + indel_score: Score to apply for insertion/deletion transitions where one sequence advances + without the other advancing (thus inserting a gap). Defaults to -1. + gap: Value to use for marking gaps in the aligned sequences. Defaults to "-". + + Returns: + Needleman-Wunsch alignment score representing the sum of match, mismatch and + insertion/deletion transition scores for the provided alignment. + + Note: + See `NWScore()` function at + https://en.wikipedia.org/wiki/Needleman%E2%80%93Wunsch_algorithm for more information. + """ + # First, map the sequences to integers + _, aligned_seq_a_indices, aligned_seq_b_indices = _entry2idx( + aligned_seq_a, aligned_seq_b, gap, allow_gap=True + ) + + # Now, get the score + return float( + _sequence_align.alignment_score( + aligned_seq_a_indices, + aligned_seq_b_indices, + match_score=match_score, + mismatch_score=mismatch_score, + indel_score=indel_score, + gap_val=_GAP_VAL, + ) + ) diff --git a/tests/unit/test_alignment_score.py b/tests/unit/test_alignment_score.py new file mode 100644 index 0000000..f4347a1 --- /dev/null +++ b/tests/unit/test_alignment_score.py @@ -0,0 +1,46 @@ +# Copyright 2023-present Kensho Technologies, LLC. +import unittest + +from sequence_align.pairwise import alignment_score + + +# Try something non-default +DEFAULT_GAP = "?" + + +class TestNWScore(unittest.TestCase): + def test_empty(self) -> None: + """Score of two empty sequences should always be zero.""" + self.assertEqual(alignment_score([], []), 0) + + def test_unequal(self) -> None: + """Should fail with sequences of different length.""" + with self.assertRaises(ValueError): + alignment_score(["A", "B", "C"], ["D", "E"]) + + def test_normal(self) -> None: + """Score of two nonempty sequences should match and in both directions.""" + # Should be one insertion, one match, and one mismatch based on the scores + seq_a = ["A", "B", "C"] + seq_b = [DEFAULT_GAP, "B", "D"] + match_score = 1.234 + mismatch_score = -1.234 + indel_score = -1.012 + + expected_score = match_score + mismatch_score + indel_score + + for ab_order in [False, True]: + seq_a_proc = seq_a if ab_order else seq_b + seq_b_proc = seq_b if ab_order else seq_a + with self.subTest(ab_order=ab_order): + self.assertEqual( + alignment_score( + seq_a_proc, + seq_b_proc, + match_score=match_score, + mismatch_score=mismatch_score, + indel_score=indel_score, + gap=DEFAULT_GAP, + ), + expected_score, + )