From 39c18f51a68b108c94e567d57efc92b685b75b5d Mon Sep 17 00:00:00 2001 From: Mario Picciani Date: Wed, 7 Aug 2024 00:37:08 +0200 Subject: [PATCH] rm compute_ion_mass function + custom mod cleanup --- .../annotation/annotation.py | 16 +- spectrum_fundamentals/constants.py | 29 +--- spectrum_fundamentals/fragments.py | 152 +++--------------- spectrum_fundamentals/mod_string.py | 8 +- tests/unit_tests/test_annotation.py | 20 +++ tests/unit_tests/test_constants.py | 21 --- tests/unit_tests/test_mod_string.py | 2 +- 7 files changed, 55 insertions(+), 193 deletions(-) delete mode 100644 tests/unit_tests/test_constants.py diff --git a/spectrum_fundamentals/annotation/annotation.py b/spectrum_fundamentals/annotation/annotation.py index ad53432..8be5407 100644 --- a/spectrum_fundamentals/annotation/annotation.py +++ b/spectrum_fundamentals/annotation/annotation.py @@ -121,7 +121,7 @@ def annotate_spectra( un_annot_spectra: pd.DataFrame, mass_tolerance: Optional[float] = None, unit_mass_tolerance: Optional[str] = None, - custom_mods: Optional[Dict[str, Dict[str, Tuple[str, float]]]] = None, + custom_mods: Optional[Dict[str, float]] = None, fragmentation_method: str = "HCD", ) -> pd.DataFrame: """ @@ -142,7 +142,7 @@ def annotate_spectra( :param mass_tolerance: mass tolerance to calculate min and max mass :param unit_mass_tolerance: unit for the mass tolerance (da or ppm) :param fragmentation_method: fragmentation method that was used - :param custom_mods: Custom Modifications with the identifier, the unimod equivalent and the respective mass + :param custom_mods: mapping of custom UNIMOD string identifiers ('[UNIMOD:xyz]') to their mass :return: a Pandas DataFrame containing the annotated spectra with meta data """ raw_file_annotations = [] @@ -345,7 +345,7 @@ def parallel_annotate( index_columns: Dict[str, int], mass_tolerance: Optional[float] = None, unit_mass_tolerance: Optional[str] = None, - custom_mods: Optional[Dict[str, Dict[str, Tuple[str, float]]]] = None, + custom_mods: Optional[Dict[str, float]] = None, fragmentation_method: str = "HCD", ) -> Optional[ Union[ @@ -367,7 +367,7 @@ def parallel_annotate( :param index_columns: a dictionary that contains the index columns of the spectrum :param mass_tolerance: mass tolerance to calculate min and max mass :param unit_mass_tolerance: unit for the mass tolerance (da or ppm) - :param custom_mods: Custom Modifications with the identifier, the unimod equivalent and the respective mass + :param custom_mods: mapping of custom UNIMOD string identifiers ('[UNIMOD:xyz]') to their mass :param fragmentation_method: fragmentation method that was used :return: a tuple containing intensity values (np.ndarray), masses (np.ndarray), calculated mass (float), and any removed peaks (List[str]) @@ -397,7 +397,7 @@ def _annotate_linear_spectrum( index_columns: Dict[str, int], mass_tolerance: Optional[float], unit_mass_tolerance: Optional[str], - custom_mods: Optional[Dict[str, Dict[str, Tuple[str, float]]]] = None, + custom_mods: Optional[Dict[str, float]] = None, fragmentation_method: str = "HCD", ): """ @@ -407,7 +407,7 @@ def _annotate_linear_spectrum( :param index_columns: Index columns of the spectrum :param mass_tolerance: Mass tolerance for calculating min and max mass :param unit_mass_tolerance: Unit for the mass tolerance (da or ppm) - :param custom_mods: Custom Modifications with the identifier, the unimod equivalent and the respective mass + :param custom_mods: mapping of custom UNIMOD string identifiers ('[UNIMOD:xyz]') to their mass :param fragmentation_method: fragmentation method that was used :return: Annotated spectrum """ @@ -454,7 +454,7 @@ def _annotate_crosslinked_spectrum( crosslinker_type: str, mass_tolerance: Optional[float] = None, unit_mass_tolerance: Optional[str] = None, - custom_mods: Optional[Dict[str, Dict[str, Tuple[str, float]]]] = None, + custom_mods: Optional[Dict[str, float]] = None, ): """ Annotate a crosslinked peptide spectrum. @@ -464,7 +464,7 @@ def _annotate_crosslinked_spectrum( :param crosslinker_type: Type of crosslinker used :param mass_tolerance: Mass tolerance for calculating min and max mass :param unit_mass_tolerance: Unit for the mass tolerance (da or ppm) - :param custom_mods: Custom Modifications with the identifier, the unimod equivalent and the respective mass + :param custom_mods: mapping of custom UNIMOD string identifiers ('[UNIMOD:xyz]') to their mass :raises ValueError: if unsupported crosslinker type was supplied. :return: Annotated spectrum diff --git a/spectrum_fundamentals/constants.py b/spectrum_fundamentals/constants.py index c2531f6..bc72e3f 100644 --- a/spectrum_fundamentals/constants.py +++ b/spectrum_fundamentals/constants.py @@ -1,5 +1,4 @@ from enum import Enum -from typing import Dict, List, Optional, Tuple import numpy as np @@ -235,32 +234,6 @@ } -def update_mod_masses(mods: Optional[Dict[str, Dict[str, Tuple[str, float]]]] = None) -> Dict[str, float]: - """ - Function to update MOD_MASSES with custom modifications. - - :param mods: Modifications with respective mass - :raises AssertionError: if mass of modification was not provided as float. - :return: updated MOD_MASSES - """ - mod_masses = {} - if mods: - try: - stat_mods: List[Tuple[str, float]] = [ - (value[0], float(value[1])) for key, value in (mods.get("stat_mods") or {}).items() - ] - var_mods: List[Tuple[str, float]] = [ - (value[0], float(value[1])) for key, value in (mods.get("var_mods") or {}).items() - ] - except ValueError as e: - raise AssertionError("All custom modifications value entries must be of type Tuple[str, float]") from e - - for value in stat_mods + var_mods: - mod_masses[value[0]] = float(value[1]) - - return MOD_MASSES | mod_masses - - MOD_MASSES_SAGE = { "229.1629": "[UNIMOD:737]", "304.2071": "[UNIMOD:2016]", @@ -276,6 +249,8 @@ def update_mod_masses(mods: Optional[Dict[str, Dict[str, Tuple[str, float]]]] = "42.0105": "[UNIMOD:1]", } # these are only used for prosit_grpc, oktoberfest uses the masses from MOD_MASSES + + AA_MOD_MASSES = { "K[UNIMOD:737]": AA_MASSES["K"] + MOD_MASSES["[UNIMOD:737]"], "M[UNIMOD:35]": AA_MASSES["M"] + MOD_MASSES["[UNIMOD:35]"], diff --git a/spectrum_fundamentals/fragments.py b/spectrum_fundamentals/fragments.py index f6d2681..3d6d3b9 100644 --- a/spectrum_fundamentals/fragments.py +++ b/spectrum_fundamentals/fragments.py @@ -6,15 +6,13 @@ import numpy as np import pandas as pd -from . import constants as constants +from .constants import AA_MASSES, ATOM_MASSES, MOD_MASSES, PARTICLE_MASSES from .mod_string import internal_without_mods logger = logging.getLogger(__name__) -def _get_modifications( - peptide_sequence: str, custom_mods: Optional[Dict[str, Dict[str, Tuple[str, float]]]] = None -) -> Dict[int, float]: +def _get_modifications(peptide_sequence: str, custom_mods: Optional[Dict[str, float]] = None) -> Dict[int, float]: """ Get modification masses and position in a peptide sequence. @@ -26,7 +24,7 @@ def _get_modifications( modification was present are returned. :param peptide_sequence: Modified peptide sequence - :param custom_mods: Custom Modifications with the identifier, the unimod equivalent and the respective mass + :param custom_mods: mapping of custom UNIMOD string identifiers ('[UNIMOD:xyz]') to their mass :return: modification_deltas """ modification_deltas = {} @@ -40,18 +38,17 @@ def _get_modifications( pattern = re.compile(r"\[.{8}[^\]]*\]") matches = pattern.finditer(peptide_sequence) - mod_masses = constants.update_mod_masses(mods=custom_mods) + mod_masses = MOD_MASSES | (custom_mods or {}) for match in matches: - start_pos = match.start() - end_pos = match.end() - modification_deltas[start_pos - offset] = mod_masses[peptide_sequence[start_pos:end_pos]] + start_pos, end_pos = match.span() + modification_deltas[start_pos - offset] = mod_masses[match.group()] offset += end_pos - start_pos return modification_deltas -def compute_peptide_mass(sequence: str, custom_mods: Optional[Dict[str, Dict[str, Tuple[str, float]]]] = None) -> float: +def compute_peptide_mass(sequence: str, custom_mods: Optional[Dict[str, float]] = None) -> float: """ Compute the theoretical mass of the peptide sequence. @@ -59,14 +56,14 @@ def compute_peptide_mass(sequence: str, custom_mods: Optional[Dict[str, Dict[str :param custom_mods: Custom Modifications with the identifier, the unimod equivalent and the respective mass :return: Theoretical mass of the sequence """ - terminal_masses = 2 * constants.ATOM_MASSES["H"] + constants.ATOM_MASSES["O"] # add terminal masses HO- and H- + terminal_masses = 2 * ATOM_MASSES["H"] + ATOM_MASSES["O"] # add terminal masses HO- and H- modification_deltas = _get_modifications(sequence, custom_mods=custom_mods) if modification_deltas: # there were modifictions sequence = internal_without_mods([sequence])[0] terminal_masses += modification_deltas.get(-2, 0.0) # prime with n_term_mod delta if present - peptide_sum = sum([constants.AA_MASSES[c] + modification_deltas.get(i, 0.0) for i, c in enumerate(sequence)]) + peptide_sum = sum([AA_MASSES[c] + modification_deltas.get(i, 0.0) for i, c in enumerate(sequence)]) return terminal_masses + peptide_sum @@ -141,12 +138,12 @@ def get_ion_delta(ion_types: List[str]) -> np.ndarray: :return: numpy array with masses of the ions """ ion_type_offsets = { - "a": -constants.ATOM_MASSES["O"] - constants.ATOM_MASSES["C"], + "a": -ATOM_MASSES["O"] - ATOM_MASSES["C"], "b": 0.0, - "c": 3 * constants.ATOM_MASSES["H"] + constants.ATOM_MASSES["N"], - "x": 2 * constants.ATOM_MASSES["O"] + constants.ATOM_MASSES["C"], - "y": constants.ATOM_MASSES["O"] + 2 * constants.ATOM_MASSES["H"], - "z": constants.ATOM_MASSES["O"] - constants.ATOM_MASSES["N"] - constants.ATOM_MASSES["H"], + "c": 3 * ATOM_MASSES["H"] + ATOM_MASSES["N"], + "x": 2 * ATOM_MASSES["O"] + ATOM_MASSES["C"], + "y": ATOM_MASSES["O"] + 2 * ATOM_MASSES["H"], + "z": ATOM_MASSES["O"] - ATOM_MASSES["N"] - ATOM_MASSES["H"], } # I think list comprehension is fastest way deltas = np.array([ion_type_offsets[ion_type] for ion_type in ion_types]).reshape(len(ion_types), 1) @@ -164,7 +161,7 @@ def initialize_peaks( peptide_beta_mass: float = 0.0, xl_pos: int = -1, fragmentation_method: str = "HCD", - custom_mods: Optional[Dict[str, Dict[str, Tuple[str, float]]]] = None, + custom_mods: Optional[Dict[str, float]] = None, ) -> Tuple[List[dict], int, str, float]: """ Generate theoretical peaks for a modified peptide sequence. @@ -178,7 +175,7 @@ def initialize_peaks( :param peptide_beta_mass: the mass of the second peptide to be considered for non-cleavable XL :param xl_pos: the position of the crosslinker for non-cleavable XL :param fragmentation_method: fragmentation method that was used - :param custom_mods: Custom Modifications with the identifier, the unimod equivalent and the respective mass + :param custom_mods: mapping of custom UNIMOD string identifiers ('[UNIMOD:xyz]') to their mass :return: List of theoretical peaks, Flag to indicate if there is a tmt on n-terminus, Un modified peptide sequence """ _xl_sanity_check(noncl_xl, peptide_beta_mass, xl_pos) @@ -205,7 +202,7 @@ def initialize_peaks( # add n_term mass to first aa for easy processing in the following calculation modification_deltas[0] = modification_deltas.get(0, 0.0) + n_term_delta - mass_arr = np.array([constants.AA_MASSES[_] for _ in sequence]) + mass_arr = np.array([AA_MASSES[_] for _ in sequence]) for pos, mod_mass in modification_deltas.items(): mass_arr[pos] += mod_mass @@ -224,7 +221,7 @@ def initialize_peaks( # calculate for m/z for charges 1, 2, 3 # shape of ion_mzs: (n_ions, n_fragments, max_charge) charges = np.arange(1, max_charge + 1) - ion_mzs = (sum_array[..., np.newaxis] + charges * constants.PARTICLE_MASSES["PROTON"]) / charges + ion_mzs = (sum_array[..., np.newaxis] + charges * PARTICLE_MASSES["PROTON"]) / charges min_mzs, max_mzs = get_min_max_mass(mass_analyzer, ion_mzs, mass_tolerance, unit_mass_tolerance) @@ -249,7 +246,7 @@ def initialize_peaks( fragments_meta_data, n_term_mod, sequence, - (peptide_mass + constants.ATOM_MASSES["O"] + 2 * constants.ATOM_MASSES["H"]), + (peptide_mass + ATOM_MASSES["O"] + 2 * ATOM_MASSES["H"]), ) @@ -261,7 +258,7 @@ def initialize_peaks_xl( mass_tolerance: Optional[float] = None, unit_mass_tolerance: Optional[str] = None, sequence_beta: Optional[str] = None, - custom_mods: Optional[Dict[str, Dict[str, Tuple[str, float]]]] = None, + custom_mods: Optional[Dict[str, float]] = None, ) -> Tuple[List[dict], int, str, float]: """ Generate theoretical peaks for a modified (potentially cleavable cross-linked) peptide sequence. @@ -275,7 +272,7 @@ def initialize_peaks_xl( :param mass_tolerance: mass tolerance to calculate min and max mass :param unit_mass_tolerance: unit for the mass tolerance (da or ppm) :param sequence_beta: optional second peptide to be considered for non-cleavable XL - :param custom_mods: Custom Modifications with the identifier, the unimod equivalent and the respective mass + :param custom_mods: mapping of custom UNIMOD string identifiers ('[UNIMOD:xyz]') to their mass :raises ValueError: if crosslinker_type is unkown :raises AssertionError: if the short and long XL sequence (the one with the short / long crosslinker mod) has a tmt n term while the other one does not @@ -425,110 +422,3 @@ def get_min_max_mass( else: raise ValueError(f"Unsupported mass_analyzer: {mass_analyzer}") return (min_mass, max_mass) - - -def compute_ion_masses(seq_int: List[int], charge_onehot: List[int], tmt: str = "") -> Optional[np.ndarray]: - """ - Collects an integer sequence e.g. [1,2,3] with charge 2 and returns array with 174 positions for ion masses. - - Invalid masses are set to -1. - - :param seq_int: TODO - :param charge_onehot: is a onehot representation of charge with 6 elems for charges 1 to 6 - :param tmt: TODO - :return: list of masses as floats - """ - charge = list(charge_onehot).index(1) + 1 - if not (charge in (1, 2, 3, 4, 5, 6) and len(charge_onehot) == 6): - print("[ERROR] One-hot-enconded Charge is not in valid range 1 to 6") - return None - - if not len(seq_int) == constants.SEQ_LEN: - print(f"[ERROR] Sequence length {len(seq_int)} is not desired length of {constants.SEQ_LEN}") - return None - - idx = list(seq_int).index(0) if 0 in seq_int else constants.SEQ_LEN - masses = np.ones((constants.SEQ_LEN - 1) * 2 * 3, dtype=np.float32) * -1 - mass_b = 0 - mass_y = 0 - j = 0 # iterate over masses - - # Iterate over sequence, sequence should have length 30 - for i in range(idx - 1): # only 29 possible ions - j = i * 6 # index for masses array at position - - # MASS FOR Y IONS - # print("Added", constants.VEC_MZ[seq_int[l-1-i]]) - mass_y += constants.VEC_MZ[seq_int[idx - 1 - i]] - - # Compute charge +1 - masses[j] = ( - mass_y - + 1 * constants.PARTICLE_MASSES["PROTON"] - + constants.MASSES["C_TERMINUS"] - + constants.ATOM_MASSES["H"] - ) / 1.0 - # Compute charge +2 - masses[j + 1] = ( - ( - mass_y - + 2 * constants.PARTICLE_MASSES["PROTON"] - + constants.MASSES["C_TERMINUS"] - + constants.ATOM_MASSES["H"] - ) - / 2.0 - if charge >= 2 - else -1.0 - ) - # Compute charge +3 - masses[j + 2] = ( - ( - mass_y - + 3 * constants.PARTICLE_MASSES["PROTON"] - + constants.MASSES["C_TERMINUS"] - + constants.ATOM_MASSES["H"] - ) - / 3.0 - if charge >= 3.0 - else -1.0 - ) - - # MASS FOR B IONS - if i == 0 and tmt != "": - mass_b += constants.VEC_MZ[seq_int[i]] + constants.MOD_MASSES[constants.TMT_MODS[tmt]] - else: - mass_b += constants.VEC_MZ[seq_int[i]] - - # Compute charge +1 - masses[j + 3] = ( - mass_b - + 1 * constants.PARTICLE_MASSES["PROTON"] - + constants.MASSES["N_TERMINUS"] - - constants.ATOM_MASSES["H"] - ) / 1.0 - # Compute charge +2 - masses[j + 4] = ( - ( - mass_b - + 2 * constants.PARTICLE_MASSES["PROTON"] - + constants.MASSES["N_TERMINUS"] - - constants.ATOM_MASSES["H"] - ) - / 2.0 - if charge >= 2 - else -1.0 - ) - # Compute charge +3 - masses[j + 5] = ( - ( - mass_b - + 3 * constants.PARTICLE_MASSES["PROTON"] - + constants.MASSES["N_TERMINUS"] - - constants.ATOM_MASSES["H"] - ) - / 3.0 - if charge >= 3.0 - else -1.0 - ) - - return masses diff --git a/spectrum_fundamentals/mod_string.py b/spectrum_fundamentals/mod_string.py index 2016c02..99b3702 100644 --- a/spectrum_fundamentals/mod_string.py +++ b/spectrum_fundamentals/mod_string.py @@ -6,7 +6,7 @@ import numpy as np import pandas as pd -from .constants import MOD_NAMES, SPECTRONAUT_MODS, XISEARCH_VAR_MODS, update_mod_masses +from .constants import MOD_MASSES, MOD_NAMES, SPECTRONAUT_MODS, XISEARCH_VAR_MODS def sage_to_internal(sequences: List[str], mods: Dict[str, str]) -> List[str]: @@ -200,9 +200,7 @@ def internal_without_mods(sequences: List[str]) -> List[str]: return [re.sub(regex, "", seq) for seq in sequences] -def internal_to_mod_mass( - sequences: List[str], custom_mods: Optional[Dict[str, Dict[str, Tuple[str, float]]]] = None -) -> List[str]: +def internal_to_mod_mass(sequences: List[str], custom_mods: Optional[Dict[str, float]] = None) -> List[str]: """ Function to exchange the internal mod identifiers with the masses of the specific modifiction. @@ -210,7 +208,7 @@ def internal_to_mod_mass( :param custom_mods: custom mods with the identifier (=key), respespective unimod identifier and mass (value) :return: List[str] of modified sequences """ - mod_masses = update_mod_masses(custom_mods) + mod_masses = MOD_MASSES | (custom_mods or {}) regex = re.compile("(%s)" % "|".join(map(re.escape, mod_masses.keys()))) replacement_func = lambda match: f"[+{mod_masses[match.string[match.start():match.end()]]}]" diff --git a/tests/unit_tests/test_annotation.py b/tests/unit_tests/test_annotation.py index 5722f37..f888031 100644 --- a/tests/unit_tests/test_annotation.py +++ b/tests/unit_tests/test_annotation.py @@ -30,6 +30,26 @@ def test_annotate_spectra(self): result = annotation.annotate_spectra(spectrum_input) pd.testing.assert_frame_equal(expected_result, result) + def test_annotate_spectra_with_custom_mods(self): + """Test annotate spectra.""" + spectrum_input = pd.read_csv( + Path(__file__).parent / "data/spectrum_input.csv", + index_col=0, + converters={"INTENSITIES": literal_eval, "MZ": literal_eval}, + ) + + expected_result = pd.read_csv( + Path(__file__).parent / "data/spectrum_output.csv", + index_col=0, + converters={"INTENSITIES": literal_eval, "MZ": literal_eval}, + ) + spectrum_input["INTENSITIES"] = spectrum_input["INTENSITIES"].map(lambda intensities: np.array(intensities)) + spectrum_input["MZ"] = spectrum_input["MZ"].map(lambda mz: np.array(mz)) + custom_mods = {"[UNIMOD:4]": 57.0215, "[UNIMOD:35]": 15.99} + + result = annotation.annotate_spectra(un_annot_spectra=spectrum_input, custom_mods=custom_mods) + pd.testing.assert_frame_equal(expected_result, result) + def test_annotate_spectra_noncl_xl(self): """Test annotate spectra non cleavable crosslinked peptides.""" spectrum_input = pd.read_json( diff --git a/tests/unit_tests/test_constants.py b/tests/unit_tests/test_constants.py deleted file mode 100644 index b9479fe..0000000 --- a/tests/unit_tests/test_constants.py +++ /dev/null @@ -1,21 +0,0 @@ -import unittest - -import spectrum_fundamentals.constants as c - - -class UpdateModMasses(unittest.TestCase): - """Class to test updating mod masses.""" - - def test_update_mod_masses(self): - """Test _update_mod_masses.""" - custom_mods = {"stat_mods": {"57.0215": ("[UNIMOD:737]", 229.1628)}} - updated_mods = c.MOD_MASSES.copy() - updated_mods.update({"[UNIMOD:737]": 229.1628}) - self.assertEqual(c.update_mod_masses(custom_mods), updated_mods) - - """ - def test_update_mod_masses_error(self): - ""Test update_mod_masses."" - custom_mods = {"stat_mods": {"57.0215": ("[UNIMOD:275]", "MOD")}} - self.assertRaises(AssertionError, c.update_mod_masses, custom_mods) - """ diff --git a/tests/unit_tests/test_mod_string.py b/tests/unit_tests/test_mod_string.py index 902412b..cee6c5c 100644 --- a/tests/unit_tests/test_mod_string.py +++ b/tests/unit_tests/test_mod_string.py @@ -241,7 +241,7 @@ def test_internal_to_mod_masses(self): def test_internal_to_mod_masses_custom(self): """Test internal with custom mods to internal without_mods.""" - mods = {"stat_mods": {"57.0215": ("[UNIMOD:977]", 57.0215)}} + mods = {"[UNIMOD:977]": 57.0215} self.assertEqual( mod.internal_to_mod_mass(["[UNIMOD:737]-ABC[UNIMOD:977]DEFGHK[UNIMOD:737]"], mods), ["[+229.162932]-ABC[+57.0215]DEFGHK[+229.162932]"],