Skip to content

Commit

Permalink
making parallel_annotate() less complex
Browse files Browse the repository at this point in the history
  • Loading branch information
Mostafa Kalhor committed Apr 8, 2024
1 parent af1cf68 commit ad50cc0
Showing 1 changed file with 141 additions and 119 deletions.
260 changes: 141 additions & 119 deletions spectrum_fundamentals/annotation/annotation.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import logging
from typing import Any, Callable, Dict, List, Optional, Tuple, Union
from typing import Dict, List, Optional, Tuple, Union

import numpy as np
import pandas as pd
Expand Down Expand Up @@ -348,10 +348,11 @@ def parallel_annotate(
"""
Perform parallel annotation of a spectrum.
This function takes a spectrum and its index columns and performs parallel annotation of the spectrum. It starts by
initializing the peaks and extracting necessary data from the spectrum. It then matches the peaks to the spectrum and
generates an annotation matrix based on the matched peaks. If there are multiple matches found, it removes the redundant
matches. Finally, it returns annotated spectrum with meta data including intensity values, masses, calculated masses,
This function takes a spectrum and its index columns and performs parallel annotation of the spectrum.
It starts by initializing the peaks and extracting necessary data from the spectrum.
It then matches the peaks to the spectrum and generates an annotation matrix based on the matched peaks.
If there are multiple matches found, it removes the redundant matches.
Finally, it returns annotated spectrum with meta data including intensity values, masses, calculated masses,
and any peaks that were removed. The function is designed to run in different threads to speed up the annotation pipeline.
:param spectrum: a np.ndarray that contains the spectrum to be annotated
Expand All @@ -366,122 +367,143 @@ def parallel_annotate(
if "MODIFIED_SEQUENCE_MSA" in index_columns:
mod_seq_column = "MODIFIED_SEQUENCE_MSA"

if "CROSSLINKER_TYPE" in index_columns:
crosslinker_type = spectrum[index_columns["CROSSLINKER_TYPE"]]
else:
crosslinker_type = None

if crosslinker_type is not None:
inputs_a = [
spectrum[index_columns["MODIFIED_SEQUENCE_A"]],
spectrum[index_columns["MASS_ANALYZER"]],
spectrum[index_columns["CROSSLINKER_POSITION_A"]],
spectrum[index_columns["CROSSLINKER_TYPE"]],
mass_tolerance,
unit_mass_tolerance,
]
inputs_b = [
spectrum[index_columns["MODIFIED_SEQUENCE_B"]],
spectrum[index_columns["MASS_ANALYZER"]],
spectrum[index_columns["CROSSLINKER_POSITION_B"]],
spectrum[index_columns["CROSSLINKER_TYPE"]],
mass_tolerance,
unit_mass_tolerance,
]
if crosslinker_type in ["BS3", "DSS"]: # non cleavable XL
array_size = 174
inputs_a.append(spectrum[index_columns["MODIFIED_SEQUENCE_B"]])
inputs_b.append(spectrum[index_columns["MODIFIED_SEQUENCE_B"]])
matrix_func: Callable[[Any, str, int], Tuple[np.ndarray, np.ndarray]] = generate_annotation_matrix
elif crosslinker_type in ["DSSO", "DSBU", "BUURBU"]:
array_size = 348
matrix_func = generate_annotation_matrix_xl
else:
raise ValueError(f"Unsupported crosslinker type provided: {crosslinker_type}")

fragments_meta_data_a, tmt_n_term_a, unmod_sequence_a, calc_mass_a = initialize_peaks_xl(*inputs_a)
fragments_meta_data_b, tmt_n_term_b, unmod_sequence_b, calc_mass_b = initialize_peaks_xl(*inputs_b)

if not unmod_sequence_a:
return None
if not unmod_sequence_b:
return None

matched_peaks_a = match_peaks(
fragments_meta_data_a,
spectrum[index_columns["INTENSITIES"]],
spectrum[index_columns["MZ"]],
tmt_n_term_a,
unmod_sequence_a,
spectrum[index_columns["PRECURSOR_CHARGE"]],
)
crosslinker_type = index_columns.get("CROSSLINKER_TYPE")
if crosslinker_type is None:
return _annotate_linear_spectrum(spectrum, index_columns, mod_seq_column, mass_tolerance, unit_mass_tolerance)

matched_peaks_b = match_peaks(
fragments_meta_data_b,
spectrum[index_columns["INTENSITIES"]],
spectrum[index_columns["MZ"]],
tmt_n_term_b,
unmod_sequence_b,
spectrum[index_columns["PRECURSOR_CHARGE"]],
)
return _annotate_crosslinked_spectrum(
spectrum, index_columns, crosslinker_type, mod_seq_column, mass_tolerance, unit_mass_tolerance
)

if len(matched_peaks_a) == 0:
intensities_a = np.full(array_size, 0.0)
mass_a = np.full(array_size, 0.0)
removed_peaks_a = 0
else:
matched_peaks_a, removed_peaks_a = handle_multiple_matches(matched_peaks_a)
intensities_a, mass_a = matrix_func(
matched_peaks_a, unmod_sequence_a, spectrum[index_columns["CROSSLINKER_POSITION_A"]]
)

if len(matched_peaks_b) == 0:
intensities_b = np.full(array_size, 0.0)
mass_b = np.full(array_size, 0.0)
removed_peaks_b = 0
else:
matched_peaks_b, removed_peaks_b = handle_multiple_matches(matched_peaks_b)
intensities_b, mass_b = matrix_func(
matched_peaks_b, unmod_sequence_b, spectrum[index_columns["CROSSLINKER_POSITION_B"]]
)

return (
intensities_a,
intensities_b,
mass_a,
mass_b,
calc_mass_a,
calc_mass_b,
removed_peaks_a,
removed_peaks_b,
)

def _annotate_linear_spectrum(spectrum, index_columns, mod_seq_column, mass_tolerance, unit_mass_tolerance):
"""
Annotate a linear peptide spectrum.
:param spectrum: Spectrum to be annotated
:param index_columns: Index columns of the spectrum
:param mod_seq_column: Modified sequence column
:param mass_tolerance: Mass tolerance for calculating min and max mass
:param unit_mass_tolerance: Unit for the mass tolerance (da or ppm)
:return: Annotated spectrum
"""
fragments_meta_data, tmt_n_term, unmod_sequence, calc_mass = initialize_peaks(
spectrum[index_columns[mod_seq_column]],
spectrum[index_columns["MASS_ANALYZER"]],
spectrum[index_columns["PRECURSOR_CHARGE"]],
mass_tolerance,
unit_mass_tolerance,
)
if not unmod_sequence:
return None
matched_peaks = match_peaks(
fragments_meta_data,
spectrum[index_columns["INTENSITIES"]],
spectrum[index_columns["MZ"]],
tmt_n_term,
unmod_sequence,
spectrum[index_columns["PRECURSOR_CHARGE"]],
)

if len(matched_peaks) == 0:
intensity = np.full(174, 0.0)
mass = np.full(174, 0.0)
return intensity, mass, calc_mass, 0

matched_peaks, removed_peaks = handle_multiple_matches(matched_peaks)
intensities, mass = generate_annotation_matrix(
matched_peaks, unmod_sequence, spectrum[index_columns["PRECURSOR_CHARGE"]]
)
return intensities, mass, calc_mass, removed_peaks


def _annotate_crosslinked_spectrum(
spectrum, index_columns, crosslinker_type, mod_seq_column, mass_tolerance, unit_mass_tolerance
):
"""
Annotate a crosslinked peptide spectrum.
:param spectrum: Spectrum to be annotated
:param index_columns: Index columns of the spectrum
:param crosslinker_type: Type of crosslinker used
:param mod_seq_column: Modified sequence column
:param mass_tolerance: Mass tolerance for calculating min and max mass
:param unit_mass_tolerance: Unit for the mass tolerance (da or ppm)
:return: Annotated spectrum
"""
crosslinker_type_index = index_columns.get("CROSSLINKER_TYPE")
if crosslinker_type_index is not None:
crosslinker_type = spectrum[crosslinker_type_index]
else:
raise ValueError("Crosslinker type column not found in index_columns.")

inputs_a = [
spectrum[index_columns["MODIFIED_SEQUENCE_A"]],
spectrum[index_columns["MASS_ANALYZER"]],
spectrum[index_columns["CROSSLINKER_POSITION_A"]],
spectrum[index_columns["CROSSLINKER_TYPE"]],
mass_tolerance,
unit_mass_tolerance,
]
inputs_b = [
spectrum[index_columns["MODIFIED_SEQUENCE_B"]],
spectrum[index_columns["MASS_ANALYZER"]],
spectrum[index_columns["CROSSLINKER_POSITION_B"]],
spectrum[index_columns["CROSSLINKER_TYPE"]],
mass_tolerance,
unit_mass_tolerance,
]
if crosslinker_type in ["BS3", "DSS"]: # non cleavable XL
array_size = 174
inputs_a.append(spectrum[index_columns["MODIFIED_SEQUENCE_B"]])
inputs_b.append(spectrum[index_columns["MODIFIED_SEQUENCE_B"]])
matrix_func = generate_annotation_matrix
elif crosslinker_type in ["DSSO", "DSBU", "BUURBU"]:
array_size = 348
matrix_func = generate_annotation_matrix_xl
else:
raise ValueError(f"Unsupported crosslinker type provided: {crosslinker_type}")

fragments_meta_data_a, tmt_n_term_a, unmod_sequence_a, calc_mass_a = initialize_peaks_xl(*inputs_a)
fragments_meta_data_b, tmt_n_term_b, unmod_sequence_b, calc_mass_b = initialize_peaks_xl(*inputs_b)

if not unmod_sequence_a or not unmod_sequence_b:
return None

matched_peaks_a = match_peaks(
fragments_meta_data_a,
spectrum[index_columns["INTENSITIES"]],
spectrum[index_columns["MZ"]],
tmt_n_term_a,
unmod_sequence_a,
spectrum[index_columns["PRECURSOR_CHARGE"]],
)

matched_peaks_b = match_peaks(
fragments_meta_data_b,
spectrum[index_columns["INTENSITIES"]],
spectrum[index_columns["MZ"]],
tmt_n_term_b,
unmod_sequence_b,
spectrum[index_columns["PRECURSOR_CHARGE"]],
)

intensities_a, mass_a, removed_peaks_a = _process_matched_peaks(
matched_peaks_a, unmod_sequence_a, array_size, matrix_func, spectrum[index_columns["CROSSLINKER_POSITION_A"]]
)
intensities_b, mass_b, removed_peaks_b = _process_matched_peaks(
matched_peaks_b, unmod_sequence_b, array_size, matrix_func, spectrum[index_columns["CROSSLINKER_POSITION_B"]]
)

return intensities_a, intensities_b, mass_a, mass_b, calc_mass_a, calc_mass_b, removed_peaks_a, removed_peaks_b


def _process_matched_peaks(matched_peaks, unmod_sequence, array_size, matrix_func, crosslinker_position):
if len(matched_peaks) == 0:
intensities = np.full(array_size, 0.0)
mass = np.full(array_size, 0.0)
removed_peaks = 0
else:
fragments_meta_data, tmt_n_term, unmod_sequence, calc_mass = initialize_peaks(
spectrum[index_columns[mod_seq_column]],
spectrum[index_columns["MASS_ANALYZER"]],
spectrum[index_columns["PRECURSOR_CHARGE"]],
mass_tolerance,
unit_mass_tolerance,
)
if not unmod_sequence:
return None
matched_peaks = match_peaks(
fragments_meta_data,
spectrum[index_columns["INTENSITIES"]],
spectrum[index_columns["MZ"]],
tmt_n_term,
unmod_sequence,
spectrum[index_columns["PRECURSOR_CHARGE"]],
)

if len(matched_peaks) == 0:
intensity = np.full(174, 0.0)
mass = np.full(174, 0.0)
return intensity, mass, calc_mass, 0

matched_peaks, removed_peaks = handle_multiple_matches(matched_peaks)
intensities, mass = generate_annotation_matrix(
matched_peaks, unmod_sequence, spectrum[index_columns["PRECURSOR_CHARGE"]]
)
return intensities, mass, calc_mass, removed_peaks
intensities, mass = matrix_func(matched_peaks, unmod_sequence, crosslinker_position)
return intensities, mass, removed_peaks

0 comments on commit ad50cc0

Please sign in to comment.