Skip to content

Commit

Permalink
[0.2.0] add alignment_score() function to Python API (#19)
Browse files Browse the repository at this point in the history
This adds the Needleman-Wunsch alignment score function to the public Python API.

Fixes #18
  • Loading branch information
andrew-titus authored Aug 22, 2024
1 parent 8047296 commit 782f0f8
Show file tree
Hide file tree
Showing 5 changed files with 198 additions and 18 deletions.
27 changes: 26 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,13 @@ where `M` and `N` are the lengths of the two sequences being aligned. Hirschberg
to have the same time complexity (`O(M*N)`), but only use `O(min{M, N})` space, making it an appealing option
for memory-limited applications or extremely large sequences.

One may also compute the Needleman-Wunsch alignment score for alignments produced by either algorithm
using [sequence_align.pairwise.alignment_score](src/sequence_align/pairwise.py).

Using these algorithms is straightforward:

``` python
from sequence_align.pairwise import hirschberg, needleman_wunsch
from sequence_align.pairwise import alignment_score, hirschberg, needleman_wunsch


# See https://en.wikipedia.org/wiki/Needleman%E2%80%93Wunsch_algorithm#/media/File:Needleman-Wunsch_pairwise_sequence_alignment.png
Expand All @@ -59,6 +62,17 @@ print(aligned_seq_a)
# Expects ["G", "C", "A", "_", "T", "G", "C", "G"]
print(aligned_seq_b)

# Expects 0
score = alignment_score(
aligned_seq_a,
aligned_seq_b,
match_score=1.0,
mismatch_score=-1.0,
indel_score=-1.0,
gap="_",
)
print(score)


# See https://en.wikipedia.org/wiki/Hirschberg%27s_algorithm#Example
seq_a = ["A", "G", "T", "A", "C", "G", "C", "A"]
Expand All @@ -78,6 +92,17 @@ print(aligned_seq_a)

# Expects ["_", "_", "T", "A", "T", "G", "C", "_"]
print(aligned_seq_b)

# Expects 1
score = alignment_score(
aligned_seq_a,
aligned_seq_b,
match_score=2.0,
mismatch_score=-1.0,
indel_score=-2.0,
gap="_",
)
print(score)
```

## Performance Benchmarks
Expand Down
3 changes: 2 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ build-backend = "maturin"

[project]
name = "sequence_align"
version = "0.1.2"
version = "0.2.0"
description = "Efficient implementations of Needleman-Wunsch and other sequence alignment algorithms in Rust with Python bindings."
readme = "README.md"
requires-python = ">=3.7"
Expand Down Expand Up @@ -47,6 +47,7 @@ dev = [
"pylint",
"pytest",
"pytest-cov",
"pytest-subtests",
"pyyaml",
"types-psutil",
"types-pyyaml",
Expand Down
54 changes: 54 additions & 0 deletions rust/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -424,10 +424,64 @@ pub fn hirschberg(
Ok((aligned_seq_one, aligned_seq_two))
}

fn score_pair(
seq_one_val: i64,
seq_two_val: i64,
match_score: f64,
mismatch_score: f64,
indel_score: f64,
gap_val: i64,
) -> f64 {
if seq_one_val == seq_two_val {
match_score
} else if (seq_one_val == gap_val) || (seq_two_val == gap_val) {
indel_score
} else {
mismatch_score
}
}

/// Compute the alignment score for the given pair of aligned sequences.
///
/// # Notes
/// The sequences must be of equal length.
///
/// # Complexity
/// This takes O(n) time and O(1) space complexity, where n is the length of the sequence.
///
/// # References
/// https://en.wikipedia.org/wiki/Needleman%E2%80%93Wunsch_algorithm
#[pyfunction]
#[pyo3(signature = (seq_one, seq_two, match_score=1.0, mismatch_score=-1.0, indel_score=-1.0, gap_val=-1))]
pub fn alignment_score(
seq_one: Vec<i64>,
seq_two: Vec<i64>,
match_score: f64,
mismatch_score: f64,
indel_score: f64,
gap_val: i64,
) -> PyResult<f64> {
let seq_one_len = seq_one.len();
let seq_two_len = seq_two.len();
if seq_one_len != seq_two_len {
return Err(PyValueError::new_err(
"Sequence lengths must match! Make sure to align the sequences before calling alignment_score()."
));
}

let score = seq_one
.iter()
.zip(seq_two.iter())
.map(|(v1, v2)| score_pair(*v1, *v2, match_score, mismatch_score, indel_score, gap_val))
.sum::<f64>();
Ok(score)
}

/// A Python module implemented in Rust.
#[pymodule]
fn _sequence_align(_py: Python, m: &PyModule) -> PyResult<()> {
m.add_function(wrap_pyfunction!(needleman_wunsch, m)?)?;
m.add_function(wrap_pyfunction!(hirschberg, m)?)?;
m.add_function(wrap_pyfunction!(alignment_score, m)?)?;
Ok(())
}
86 changes: 70 additions & 16 deletions src/sequence_align/pairwise.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# Copyright 2023-present Kensho Technologies, LLC.
from typing import List, Sequence, Tuple
from typing import Dict, List, Sequence, Tuple

from sequence_align import _sequence_align # type: ignore

Expand All @@ -11,20 +11,27 @@ def _entry2idx(
seq_a: Sequence[str],
seq_b: Sequence[str],
gap: str,
) -> Tuple[List[str], List[int], List[int]]:
idx2symbol = sorted(set(seq_a).union(set(seq_b)))
symbol2idx = {idx2symbol[idx]: idx for idx in range(len(idx2symbol))}
if gap in symbol2idx:
allow_gap: bool = False,
) -> Tuple[Dict[int, str], List[int], List[int]]:
symbols = set(seq_a).union(set(seq_b))
if not allow_gap and gap in symbols:
raise ValueError(f'Gap entry "{gap}" found in seq_a and/or seq_b; must not exist in either')

symbols_without_gap = symbols - {gap}
idx2symbol: Dict[int, str] = {
_GAP_VAL: gap,
**{idx: symbol for idx, symbol in enumerate(sorted(symbols_without_gap))},
}
symbol2idx = {symbol: idx for idx, symbol in idx2symbol.items()}

seq_a_indices = [symbol2idx[symbol] for symbol in seq_a]
seq_b_indices = [symbol2idx[symbol] for symbol in seq_b]

return (idx2symbol, seq_a_indices, seq_b_indices)


def _idx2entry(
idx2symbol: List[str],
idx2symbol: Dict[int, str],
seq_a_indices_aligned: List[int],
seq_b_indices_aligned: List[int],
gap: str,
Expand All @@ -46,7 +53,7 @@ def needleman_wunsch(
Args:
seq_a: First sequence in pair to align.
seq_b: Second sequence in paior to align.
seq_b: Second sequence in pair to align.
match_score: Score to apply for transitions where the sequences match each other at a given
index. Defaults to 1.
mismatch_score: Score to apply for transitions where the sequences do _not_ match each other
Expand Down Expand Up @@ -82,10 +89,10 @@ def needleman_wunsch(
seq_a_indices_aligned, seq_b_indices_aligned = _sequence_align.needleman_wunsch(
seq_a_indices,
seq_b_indices,
match_score,
mismatch_score,
indel_score,
_GAP_VAL,
match_score=match_score,
mismatch_score=mismatch_score,
indel_score=indel_score,
gap_val=_GAP_VAL,
)

# Finally, map back and return
Expand All @@ -104,7 +111,7 @@ def hirschberg(
Args:
seq_a: First sequence in pair to align.
seq_b: Second sequence in paior to align.
seq_b: Second sequence in pair to align.
match_score: Score to apply for transitions where the sequences match each other at a given
index. Defaults to 1.
mismatch_score: Score to apply for transitions where the sequences do _not_ match each other
Expand Down Expand Up @@ -145,11 +152,58 @@ def hirschberg(
seq_a_indices_aligned, seq_b_indices_aligned = _sequence_align.hirschberg(
seq_a_indices,
seq_b_indices,
match_score,
mismatch_score,
indel_score,
_GAP_VAL,
match_score=match_score,
mismatch_score=mismatch_score,
indel_score=indel_score,
gap_val=_GAP_VAL,
)

# Finally, map back and return
return _idx2entry(idx2symbol, seq_a_indices_aligned, seq_b_indices_aligned, gap)


def alignment_score(
aligned_seq_a: Sequence[str],
aligned_seq_b: Sequence[str],
match_score: float = 1.0,
mismatch_score: float = -1.0,
indel_score: float = -1.0,
gap: str = "-",
) -> float:
"""Compute the alignment score for the pair of sequences.
Args:
aligned_seq_a: First aligned sequence.
aligned_seq_b: Second aligned sequence.
match_score: Score to apply for transitions where the sequences match each other at a given
index. Defaults to 1.
mismatch_score: Score to apply for transitions where the sequences do _not_ match each other
at a given index. Defaults to -1.
indel_score: Score to apply for insertion/deletion transitions where one sequence advances
without the other advancing (thus inserting a gap). Defaults to -1.
gap: Value to use for marking gaps in the aligned sequences. Defaults to "-".
Returns:
Needleman-Wunsch alignment score representing the sum of match, mismatch and
insertion/deletion transition scores for the provided alignment.
Note:
See `NWScore()` function at
https://en.wikipedia.org/wiki/Needleman%E2%80%93Wunsch_algorithm for more information.
"""
# First, map the sequences to integers
_, aligned_seq_a_indices, aligned_seq_b_indices = _entry2idx(
aligned_seq_a, aligned_seq_b, gap, allow_gap=True
)

# Now, get the score
return float(
_sequence_align.alignment_score(
aligned_seq_a_indices,
aligned_seq_b_indices,
match_score=match_score,
mismatch_score=mismatch_score,
indel_score=indel_score,
gap_val=_GAP_VAL,
)
)
46 changes: 46 additions & 0 deletions tests/unit/test_alignment_score.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
# Copyright 2023-present Kensho Technologies, LLC.
import unittest

from sequence_align.pairwise import alignment_score


# Try something non-default
DEFAULT_GAP = "?"


class TestAlignmentScore(unittest.TestCase):
def test_empty(self) -> None:
"""Score of two empty sequences should always be zero."""
self.assertEqual(alignment_score([], []), 0)

def test_unequal(self) -> None:
"""Should fail with sequences of different length."""
with self.assertRaises(ValueError):
alignment_score(["A", "B", "C"], ["D", "E"])

def test_normal(self) -> None:
"""Score of two nonempty sequences should match and in both directions."""
# Should be one insertion, one match, and one mismatch based on the scores
seq_a = ["A", "B", "C"]
seq_b = [DEFAULT_GAP, "B", "D"]
match_score = 1.234
mismatch_score = -1.234
indel_score = -1.012

expected_score = match_score + mismatch_score + indel_score

for ab_order in [False, True]:
seq_a_proc = seq_a if ab_order else seq_b
seq_b_proc = seq_b if ab_order else seq_a
with self.subTest(ab_order=ab_order):
self.assertEqual(
alignment_score(
seq_a_proc,
seq_b_proc,
match_score=match_score,
mismatch_score=mismatch_score,
indel_score=indel_score,
gap=DEFAULT_GAP,
),
expected_score,
)

0 comments on commit 782f0f8

Please sign in to comment.