From ad50cc0a2f8db9695c5a9c7b8dec19a8246466ad Mon Sep 17 00:00:00 2001 From: Mostafa Kalhor Date: Mon, 8 Apr 2024 16:56:36 +0200 Subject: [PATCH] making parallel_annotate() less complex --- .../annotation/annotation.py | 260 ++++++++++-------- 1 file changed, 141 insertions(+), 119 deletions(-) diff --git a/spectrum_fundamentals/annotation/annotation.py b/spectrum_fundamentals/annotation/annotation.py index 0b25cf1..9e5d260 100644 --- a/spectrum_fundamentals/annotation/annotation.py +++ b/spectrum_fundamentals/annotation/annotation.py @@ -1,5 +1,5 @@ import logging -from typing import Any, Callable, Dict, List, Optional, Tuple, Union +from typing import Dict, List, Optional, Tuple, Union import numpy as np import pandas as pd @@ -348,10 +348,11 @@ def parallel_annotate( """ Perform parallel annotation of a spectrum. - This function takes a spectrum and its index columns and performs parallel annotation of the spectrum. It starts by - initializing the peaks and extracting necessary data from the spectrum. It then matches the peaks to the spectrum and - generates an annotation matrix based on the matched peaks. If there are multiple matches found, it removes the redundant - matches. Finally, it returns annotated spectrum with meta data including intensity values, masses, calculated masses, + This function takes a spectrum and its index columns and performs parallel annotation of the spectrum. + It starts by initializing the peaks and extracting necessary data from the spectrum. + It then matches the peaks to the spectrum and generates an annotation matrix based on the matched peaks. + If there are multiple matches found, it removes the redundant matches. + Finally, it returns annotated spectrum with meta data including intensity values, masses, calculated masses, and any peaks that were removed. The function is designed to run in different threads to speed up the annotation pipeline. :param spectrum: a np.ndarray that contains the spectrum to be annotated @@ -366,122 +367,143 @@ def parallel_annotate( if "MODIFIED_SEQUENCE_MSA" in index_columns: mod_seq_column = "MODIFIED_SEQUENCE_MSA" - if "CROSSLINKER_TYPE" in index_columns: - crosslinker_type = spectrum[index_columns["CROSSLINKER_TYPE"]] - else: - crosslinker_type = None - - if crosslinker_type is not None: - inputs_a = [ - spectrum[index_columns["MODIFIED_SEQUENCE_A"]], - spectrum[index_columns["MASS_ANALYZER"]], - spectrum[index_columns["CROSSLINKER_POSITION_A"]], - spectrum[index_columns["CROSSLINKER_TYPE"]], - mass_tolerance, - unit_mass_tolerance, - ] - inputs_b = [ - spectrum[index_columns["MODIFIED_SEQUENCE_B"]], - spectrum[index_columns["MASS_ANALYZER"]], - spectrum[index_columns["CROSSLINKER_POSITION_B"]], - spectrum[index_columns["CROSSLINKER_TYPE"]], - mass_tolerance, - unit_mass_tolerance, - ] - if crosslinker_type in ["BS3", "DSS"]: # non cleavable XL - array_size = 174 - inputs_a.append(spectrum[index_columns["MODIFIED_SEQUENCE_B"]]) - inputs_b.append(spectrum[index_columns["MODIFIED_SEQUENCE_B"]]) - matrix_func: Callable[[Any, str, int], Tuple[np.ndarray, np.ndarray]] = generate_annotation_matrix - elif crosslinker_type in ["DSSO", "DSBU", "BUURBU"]: - array_size = 348 - matrix_func = generate_annotation_matrix_xl - else: - raise ValueError(f"Unsupported crosslinker type provided: {crosslinker_type}") - - fragments_meta_data_a, tmt_n_term_a, unmod_sequence_a, calc_mass_a = initialize_peaks_xl(*inputs_a) - fragments_meta_data_b, tmt_n_term_b, unmod_sequence_b, calc_mass_b = initialize_peaks_xl(*inputs_b) - - if not unmod_sequence_a: - return None - if not unmod_sequence_b: - return None - - matched_peaks_a = match_peaks( - fragments_meta_data_a, - spectrum[index_columns["INTENSITIES"]], - spectrum[index_columns["MZ"]], - tmt_n_term_a, - unmod_sequence_a, - spectrum[index_columns["PRECURSOR_CHARGE"]], - ) + crosslinker_type = index_columns.get("CROSSLINKER_TYPE") + if crosslinker_type is None: + return _annotate_linear_spectrum(spectrum, index_columns, mod_seq_column, mass_tolerance, unit_mass_tolerance) - matched_peaks_b = match_peaks( - fragments_meta_data_b, - spectrum[index_columns["INTENSITIES"]], - spectrum[index_columns["MZ"]], - tmt_n_term_b, - unmod_sequence_b, - spectrum[index_columns["PRECURSOR_CHARGE"]], - ) + return _annotate_crosslinked_spectrum( + spectrum, index_columns, crosslinker_type, mod_seq_column, mass_tolerance, unit_mass_tolerance + ) - if len(matched_peaks_a) == 0: - intensities_a = np.full(array_size, 0.0) - mass_a = np.full(array_size, 0.0) - removed_peaks_a = 0 - else: - matched_peaks_a, removed_peaks_a = handle_multiple_matches(matched_peaks_a) - intensities_a, mass_a = matrix_func( - matched_peaks_a, unmod_sequence_a, spectrum[index_columns["CROSSLINKER_POSITION_A"]] - ) - - if len(matched_peaks_b) == 0: - intensities_b = np.full(array_size, 0.0) - mass_b = np.full(array_size, 0.0) - removed_peaks_b = 0 - else: - matched_peaks_b, removed_peaks_b = handle_multiple_matches(matched_peaks_b) - intensities_b, mass_b = matrix_func( - matched_peaks_b, unmod_sequence_b, spectrum[index_columns["CROSSLINKER_POSITION_B"]] - ) - - return ( - intensities_a, - intensities_b, - mass_a, - mass_b, - calc_mass_a, - calc_mass_b, - removed_peaks_a, - removed_peaks_b, - ) +def _annotate_linear_spectrum(spectrum, index_columns, mod_seq_column, mass_tolerance, unit_mass_tolerance): + """ + Annotate a linear peptide spectrum. + + :param spectrum: Spectrum to be annotated + :param index_columns: Index columns of the spectrum + :param mod_seq_column: Modified sequence column + :param mass_tolerance: Mass tolerance for calculating min and max mass + :param unit_mass_tolerance: Unit for the mass tolerance (da or ppm) + :return: Annotated spectrum + """ + fragments_meta_data, tmt_n_term, unmod_sequence, calc_mass = initialize_peaks( + spectrum[index_columns[mod_seq_column]], + spectrum[index_columns["MASS_ANALYZER"]], + spectrum[index_columns["PRECURSOR_CHARGE"]], + mass_tolerance, + unit_mass_tolerance, + ) + if not unmod_sequence: + return None + matched_peaks = match_peaks( + fragments_meta_data, + spectrum[index_columns["INTENSITIES"]], + spectrum[index_columns["MZ"]], + tmt_n_term, + unmod_sequence, + spectrum[index_columns["PRECURSOR_CHARGE"]], + ) + + if len(matched_peaks) == 0: + intensity = np.full(174, 0.0) + mass = np.full(174, 0.0) + return intensity, mass, calc_mass, 0 + + matched_peaks, removed_peaks = handle_multiple_matches(matched_peaks) + intensities, mass = generate_annotation_matrix( + matched_peaks, unmod_sequence, spectrum[index_columns["PRECURSOR_CHARGE"]] + ) + return intensities, mass, calc_mass, removed_peaks + + +def _annotate_crosslinked_spectrum( + spectrum, index_columns, crosslinker_type, mod_seq_column, mass_tolerance, unit_mass_tolerance +): + """ + Annotate a crosslinked peptide spectrum. + + :param spectrum: Spectrum to be annotated + :param index_columns: Index columns of the spectrum + :param crosslinker_type: Type of crosslinker used + :param mod_seq_column: Modified sequence column + :param mass_tolerance: Mass tolerance for calculating min and max mass + :param unit_mass_tolerance: Unit for the mass tolerance (da or ppm) + :return: Annotated spectrum + """ + crosslinker_type_index = index_columns.get("CROSSLINKER_TYPE") + if crosslinker_type_index is not None: + crosslinker_type = spectrum[crosslinker_type_index] + else: + raise ValueError("Crosslinker type column not found in index_columns.") + + inputs_a = [ + spectrum[index_columns["MODIFIED_SEQUENCE_A"]], + spectrum[index_columns["MASS_ANALYZER"]], + spectrum[index_columns["CROSSLINKER_POSITION_A"]], + spectrum[index_columns["CROSSLINKER_TYPE"]], + mass_tolerance, + unit_mass_tolerance, + ] + inputs_b = [ + spectrum[index_columns["MODIFIED_SEQUENCE_B"]], + spectrum[index_columns["MASS_ANALYZER"]], + spectrum[index_columns["CROSSLINKER_POSITION_B"]], + spectrum[index_columns["CROSSLINKER_TYPE"]], + mass_tolerance, + unit_mass_tolerance, + ] + if crosslinker_type in ["BS3", "DSS"]: # non cleavable XL + array_size = 174 + inputs_a.append(spectrum[index_columns["MODIFIED_SEQUENCE_B"]]) + inputs_b.append(spectrum[index_columns["MODIFIED_SEQUENCE_B"]]) + matrix_func = generate_annotation_matrix + elif crosslinker_type in ["DSSO", "DSBU", "BUURBU"]: + array_size = 348 + matrix_func = generate_annotation_matrix_xl + else: + raise ValueError(f"Unsupported crosslinker type provided: {crosslinker_type}") + + fragments_meta_data_a, tmt_n_term_a, unmod_sequence_a, calc_mass_a = initialize_peaks_xl(*inputs_a) + fragments_meta_data_b, tmt_n_term_b, unmod_sequence_b, calc_mass_b = initialize_peaks_xl(*inputs_b) + + if not unmod_sequence_a or not unmod_sequence_b: + return None + + matched_peaks_a = match_peaks( + fragments_meta_data_a, + spectrum[index_columns["INTENSITIES"]], + spectrum[index_columns["MZ"]], + tmt_n_term_a, + unmod_sequence_a, + spectrum[index_columns["PRECURSOR_CHARGE"]], + ) + + matched_peaks_b = match_peaks( + fragments_meta_data_b, + spectrum[index_columns["INTENSITIES"]], + spectrum[index_columns["MZ"]], + tmt_n_term_b, + unmod_sequence_b, + spectrum[index_columns["PRECURSOR_CHARGE"]], + ) + + intensities_a, mass_a, removed_peaks_a = _process_matched_peaks( + matched_peaks_a, unmod_sequence_a, array_size, matrix_func, spectrum[index_columns["CROSSLINKER_POSITION_A"]] + ) + intensities_b, mass_b, removed_peaks_b = _process_matched_peaks( + matched_peaks_b, unmod_sequence_b, array_size, matrix_func, spectrum[index_columns["CROSSLINKER_POSITION_B"]] + ) + + return intensities_a, intensities_b, mass_a, mass_b, calc_mass_a, calc_mass_b, removed_peaks_a, removed_peaks_b + + +def _process_matched_peaks(matched_peaks, unmod_sequence, array_size, matrix_func, crosslinker_position): + if len(matched_peaks) == 0: + intensities = np.full(array_size, 0.0) + mass = np.full(array_size, 0.0) + removed_peaks = 0 else: - fragments_meta_data, tmt_n_term, unmod_sequence, calc_mass = initialize_peaks( - spectrum[index_columns[mod_seq_column]], - spectrum[index_columns["MASS_ANALYZER"]], - spectrum[index_columns["PRECURSOR_CHARGE"]], - mass_tolerance, - unit_mass_tolerance, - ) - if not unmod_sequence: - return None - matched_peaks = match_peaks( - fragments_meta_data, - spectrum[index_columns["INTENSITIES"]], - spectrum[index_columns["MZ"]], - tmt_n_term, - unmod_sequence, - spectrum[index_columns["PRECURSOR_CHARGE"]], - ) - - if len(matched_peaks) == 0: - intensity = np.full(174, 0.0) - mass = np.full(174, 0.0) - return intensity, mass, calc_mass, 0 - matched_peaks, removed_peaks = handle_multiple_matches(matched_peaks) - intensities, mass = generate_annotation_matrix( - matched_peaks, unmod_sequence, spectrum[index_columns["PRECURSOR_CHARGE"]] - ) - return intensities, mass, calc_mass, removed_peaks + intensities, mass = matrix_func(matched_peaks, unmod_sequence, crosslinker_position) + return intensities, mass, removed_peaks