From 06b8525a3d106caef36b4df2b56ec120d9cbb364 Mon Sep 17 00:00:00 2001 From: Fabian Basso Date: Fri, 12 Jul 2024 13:23:53 +0000 Subject: [PATCH] cahnges for every search engine --- spectrum_io/search_result/mascot.py | 59 +++++++++++++++++++-- spectrum_io/search_result/maxquant.py | 23 ++++---- spectrum_io/search_result/msfragger.py | 17 +++--- spectrum_io/search_result/sage.py | 10 ++-- spectrum_io/search_result/search_results.py | 11 ++-- 5 files changed, 89 insertions(+), 31 deletions(-) diff --git a/spectrum_io/search_result/mascot.py b/spectrum_io/search_result/mascot.py index 3468868..13d63fe 100644 --- a/spectrum_io/search_result/mascot.py +++ b/spectrum_io/search_result/mascot.py @@ -1,7 +1,8 @@ import logging +import re import sqlite3 from pathlib import Path -from typing import Optional, Union +from typing import Optional, Union, Dict, Tuple import pandas as pd import spectrum_fundamentals.constants as c @@ -15,7 +16,8 @@ class Mascot(SearchResults): """Handle search results from Mascot.""" - def read_result(self, tmt_labeled: str) -> pd.DataFrame: + def read_result(self, tmt_labeled: str, custom_stat_mods: Dict[str, Tuple[str, float]] = None, + custom_var_mods: Dict[str, Tuple[str, float]] = None) -> pd.DataFrame: """ Function to read a mascot msf file and perform some basic formatting. @@ -76,13 +78,60 @@ def read_result(self, tmt_labeled: str) -> pd.DataFrame: ).agg({"MODIFICATIONS": "|".join}) mod_masses_reverse = {round(float(v), 3): k for k, v in c.MOD_MASSES.items()} + + def custom_regex_escape(key: str) -> str: + """ + Subfunction to escape only normal brackets in the modstring. + + :param key: The match to escape + :return: match with escaped special characters + """ + for k, v in {"[": r"\[", "]": r"\]", "(": r"\(", ")": r"\)"}.items(): + key = key.replace(k, v) + return key + + def find_replacement(match: re.Match, seq: str) -> str: + """ + Subfunction to find the corresponding substitution for a match. + + :param match: an re.Match object found by re.sub + :return: substitution string for the given match + """ + key = match.string[match.start() : match.end()] + if custom_var_mods is not None and key in custom_var_mods.keys(): + assert isinstance(custom_mods[key][0], str), f"Provided illegal custom mod format, expected dict-values are (str, float), recieved {(type(custom_mods[key][0]).__name__), (type(custom_mods[key][1]).__name__)}." + end = match.span()[1] + if end < len(seq) and (seq[end] == "[" or seq[end]== "("): + return key + if not custom_mods[key][0].startswith(key): + return key + custom_mods[key][0] + return custom_mods[key][0] + elif custom_stat_mods is not None and key in custom_stat_mods.keys(): + assert isinstance(custom_mods[key][0], str), f"Provided illegal custom mod format, expected dict-values are (str, float), recieved {(type(replacements[key][0]).__name__), (type(replacements[key][1]).__name__)}." + return custom_mods[key][0] + return custom_mods[key] + + + custom_mods = {} + + if custom_var_mods is not None: + custom_mods.update(custom_var_mods) + if custom_stat_mods is not None: + custom_mods.update(custom_stat_mods) + + if custom_mods: + regex = re.compile("|".join(map(custom_regex_escape, custom_mods.keys()))) + sequences = [] for _, row in df.iterrows(): modifications = row["MODIFICATIONS"].split("|") + sequence = row["SEQUENCE"] + if custom_mods: + sequence = regex.sub(lambda match: find_replacement(match, sequence), sequence) + if len(modifications) == 0: - sequences.append(row["SEQUENCE"]) - else: - sequence = row["SEQUENCE"] + sequences.append(sequence) + else: skip = 0 for mod in modifications: pos, mass = mod.split("$") diff --git a/spectrum_io/search_result/maxquant.py b/spectrum_io/search_result/maxquant.py index e68d830..0453c03 100644 --- a/spectrum_io/search_result/maxquant.py +++ b/spectrum_io/search_result/maxquant.py @@ -1,6 +1,6 @@ import logging from pathlib import Path -from typing import Union +from typing import Union, Dict, Tuple import pandas as pd import spectrum_fundamentals.constants as c @@ -41,7 +41,8 @@ def add_tmt_mod(mass: float, seq: str, unimod_tag: str) -> float: mass += num_of_tmt * c.MOD_MASSES[f"{unimod_tag}"] return mass - def read_result(self, tmt_labeled: str) -> pd.DataFrame: + def read_result(self, tmt_labeled: str, custom_stat_mods: Dict[str, Tuple[str, float]] = None, + custom_var_mods: Dict[str, Tuple[str, float]] = None) -> pd.DataFrame: """ Function to read a msms txt and perform some basic formatting. @@ -72,11 +73,12 @@ def read_result(self, tmt_labeled: str) -> pd.DataFrame: df.columns = df.columns.str.upper() df.columns = df.columns.str.replace(" ", "_") - df = MaxQuant.update_columns_for_prosit(df, tmt_labeled) + df = MaxQuant.update_columns_for_prosit(df, tmt_labeled, custom_stat_mods, custom_var_mods) return filter_valid_prosit_sequences(df) @staticmethod - def update_columns_for_prosit(df: pd.DataFrame, tmt_labeled: str) -> pd.DataFrame: + def update_columns_for_prosit(df: pd.DataFrame, tmt_labeled: str, custom_stat_mods: Dict[str, Tuple[str, float]] = None, + custom_var_mods: Dict[str, Tuple[str, float]] = None) -> pd.DataFrame: """ Update columns of df to work with Prosit. @@ -94,8 +96,8 @@ def update_columns_for_prosit(df: pd.DataFrame, tmt_labeled: str) -> pd.DataFram logger.info("Adding TMT fixed modifications") df["MODIFIED_SEQUENCE"] = maxquant_to_internal( df["MODIFIED_SEQUENCE"].to_numpy(), - fixed_mods={"C": "C[UNIMOD:4]", "^_": f"_{unimod_tag}-", "K": f"K{unimod_tag}"}, - ) + fixed_mods={"C": "C[UNIMOD:4]", "^_": f"_{unimod_tag}-", "K": f"K{unimod_tag}"}, + stat_custom_mods=custom_stat_mods, var_custom_mods=custom_var_mods) df["MASS"] = df.apply(lambda x: MaxQuant.add_tmt_mod(x.MASS, x.MODIFIED_SEQUENCE, unimod_tag), axis=1) if "msa" in tmt_labeled: logger.info("Replacing phospho by dehydration for Phospho-MSA") @@ -106,16 +108,19 @@ def update_columns_for_prosit(df: pd.DataFrame, tmt_labeled: str) -> pd.DataFram logger.info("Adding SILAC fixed modifications") df.loc[df["LABELING_STATE"] == 1, "MODIFIED_SEQUENCE"] = maxquant_to_internal( df[df["LABELING_STATE"] == 1]["MODIFIED_SEQUENCE"].to_numpy(), - fixed_mods={"C": "C[UNIMOD:4]", "K": "K[UNIMOD:259]", "R": "R[UNIMOD:267]"}, + fixed_mods={"C": "C[UNIMOD:4]", "K": "K[UNIMOD:259]", "R": "R[UNIMOD:267]"}, stat_custom_mods=custom_stat_mods, + var_custom_mods=custom_var_mods ) df.loc[df["LABELING_STATE"] != 1, "MODIFIED_SEQUENCE"] = maxquant_to_internal( - df[df["LABELING_STATE"] != 1]["MODIFIED_SEQUENCE"].to_numpy() + df[df["LABELING_STATE"] != 1]["MODIFIED_SEQUENCE"].to_numpy(), stat_custom_mods=custom_stat_mods, + var_custom_mods=custom_var_mods ) df["MASS"] = df.apply(lambda x: MaxQuant.add_tmt_mod(x.MASS, x.MODIFIED_SEQUENCE, "[UNIMOD:259]"), axis=1) df["MASS"] = df.apply(lambda x: MaxQuant.add_tmt_mod(x.MASS, x.MODIFIED_SEQUENCE, "[UNIMOD:267]"), axis=1) df.drop(columns=["LABELING_STATE"], inplace=True) else: - df["MODIFIED_SEQUENCE"] = maxquant_to_internal(df["MODIFIED_SEQUENCE"].to_numpy()) + df["MODIFIED_SEQUENCE"] = maxquant_to_internal(df["MODIFIED_SEQUENCE"].to_numpy(), stat_custom_mods=custom_stat_mods, + var_custom_mods=custom_var_mods) df["SEQUENCE"] = internal_without_mods(df["MODIFIED_SEQUENCE"]) df["PEPTIDE_LENGTH"] = df["SEQUENCE"].apply(lambda x: len(x)) df["PROTEINS"].fillna("UNKNOWN", inplace=True) diff --git a/spectrum_io/search_result/msfragger.py b/spectrum_io/search_result/msfragger.py index 142f299..cb9de37 100644 --- a/spectrum_io/search_result/msfragger.py +++ b/spectrum_io/search_result/msfragger.py @@ -1,6 +1,6 @@ import logging from pathlib import Path -from typing import Union +from typing import Union, Dict, Tuple import pandas as pd import spectrum_fundamentals.constants as c @@ -16,7 +16,8 @@ class MSFragger(SearchResults): """Handle search results from MSFragger.""" - def read_result(self, tmt_labeled: str) -> pd.DataFrame: + def read_result(self, tmt_labeled: str, custom_stat_mods: Dict[str, Tuple[str, float]] = None, + custom_var_mods: Dict[str, Tuple[str, float]] = None) -> pd.DataFrame: """ Function to read a msms txt and perform some basic formatting. @@ -37,11 +38,12 @@ def read_result(self, tmt_labeled: str) -> pd.DataFrame: df = pd.concat(ms_frag_results) - df = update_columns_for_prosit(df, tmt_labeled) + df = update_columns_for_prosit(df, tmt_labeled, custom_stat_mods=custom_stat_mods, custom_var_mods=custom_var_mods) return filter_valid_prosit_sequences(df) -def update_columns_for_prosit(df, tmt_labeled: str) -> pd.DataFrame: +def update_columns_for_prosit(df, tmt_labeled: str, custom_stat_mods: Dict[str, Tuple[str, float]] = None, + custom_var_mods: Dict[str, Tuple[str, float]] = None) -> pd.DataFrame: """ Update columns of df to work with Prosit. @@ -61,10 +63,11 @@ def update_columns_for_prosit(df, tmt_labeled: str) -> pd.DataFrame: logger.info("Adding TMT fixed modifications") df["MODIFIED_SEQUENCE"] = msfragger_to_internal( df["modified_peptide"].to_list(), - fixed_mods={"C": "C[UNIMOD:4]", r"n[\d+]": f"{unimod_tag}-", "K": f"K{unimod_tag}"}, - ) + fixed_mods={"C": "C[UNIMOD:4]", r"n[\d+]": f"{unimod_tag}-", "K": f"K{unimod_tag}"}, stat_custom_mods=custom_stat_mods, + var_custom_mods=custom_var_mods) else: - df["MODIFIED_SEQUENCE"] = msfragger_to_internal(df["modified_peptide"].to_list()) + df["MODIFIED_SEQUENCE"] = msfragger_to_internal(df["modified_peptide"].to_list(), stat_custom_mods=custom_stat_mods, + var_custom_mods=custom_var_mods) df.rename( columns={ diff --git a/spectrum_io/search_result/sage.py b/spectrum_io/search_result/sage.py index 78b10c4..11ccae9 100644 --- a/spectrum_io/search_result/sage.py +++ b/spectrum_io/search_result/sage.py @@ -1,6 +1,6 @@ import logging from pathlib import Path -from typing import Union +from typing import Union, Dict, Tuple import pandas as pd import spectrum_fundamentals.constants as c @@ -14,7 +14,7 @@ class Sage(SearchResults): """Handle search results from Sage.""" - def read_result(self, tmt_labeled: str = "") -> pd.DataFrame: + def read_result(self, tmt_labeled: str = "", custom_mods: Dict[str, str] = None) -> pd.DataFrame: """ Function to read a msms tsv and perform some basic formatting. @@ -33,11 +33,11 @@ def read_result(self, tmt_labeled: str = "") -> pd.DataFrame: df.columns = df.columns.str.upper() df.columns = df.columns.str.replace(" ", "_") - df = Sage.update_columns_for_prosit(df, tmt_labeled) + df = Sage.update_columns_for_prosit(df, tmt_labeled, custom_mods) return filter_valid_prosit_sequences(df) @staticmethod - def update_columns_for_prosit(df: pd.DataFrame, tmt_labeled: str) -> pd.DataFrame: + def update_columns_for_prosit(df: pd.DataFrame, tmt_labeled: str, custom_stat_mods: Dict[str, Tuple[str, float]] = None, custom_var_mods: Dict[str, Tuple[str, float]] = None) -> pd.DataFrame: """ Update columns of df to work with Prosit. @@ -68,7 +68,7 @@ def update_columns_for_prosit(df: pd.DataFrame, tmt_labeled: str) -> pd.DataFram # length of the peptide df["PEPTIDE_LENGTH"] = df["SEQUENCE"].str.len() # converting sage to unimod - df["MODIFIED_SEQUENCE"] = sage_to_internal(df["MODIFIED_SEQUENCE"]) + df["MODIFIED_SEQUENCE"] = sage_to_internal(df["MODIFIED_SEQUENCE"], stat_custom_mods=custom_stat_mods, var_custom_mods=custom_var_mods) df["PROTEINS"].fillna("UNKNOWN", inplace=True) return df diff --git a/spectrum_io/search_result/search_results.py b/spectrum_io/search_result/search_results.py index 8e9a6b5..b91c91d 100644 --- a/spectrum_io/search_result/search_results.py +++ b/spectrum_io/search_result/search_results.py @@ -2,7 +2,7 @@ import re from abc import abstractmethod from pathlib import Path -from typing import Optional, Union +from typing import Optional, Union, Dict, Tuple import pandas as pd @@ -51,7 +51,7 @@ def __init__(self, path: Union[str, Path]): self.path = path @abstractmethod - def read_result(self, tmt_labeled: str): + def read_result(self, tmt_labeled: str, custom_mods: Dict[str, str]): """Read result. :param tmt_labeled: tmt label as str @@ -59,7 +59,7 @@ def read_result(self, tmt_labeled: str): """ raise NotImplementedError - def generate_internal(self, tmt_labeled: str, out_path: Optional[Union[str, Path]] = None) -> pd.DataFrame: + def generate_internal(self, tmt_labeled: str, out_path: Optional[Union[str, Path]] = None, custom_stat_mods: Dict[str, Tuple[str, float]] = None, custom_var_mods: Dict[str, Tuple[str, float]] = None) -> pd.DataFrame: """ Generate df and save to out_path if provided. @@ -69,7 +69,7 @@ def generate_internal(self, tmt_labeled: str, out_path: Optional[Union[str, Path """ if out_path is None: # convert and return - return self.read_result(tmt_labeled) + return self.read_result(tmt_labeled, custom_stat_mods, custom_var_mods) if isinstance(out_path, str): out_path = Path(out_path) @@ -77,10 +77,11 @@ def generate_internal(self, tmt_labeled: str, out_path: Optional[Union[str, Path if out_path.is_file(): # only read converted and return logger.info(f"Found search results in internal format at {out_path}, skipping conversion") + #TODO: internal_to_unimod return csv.read_file(out_path) # convert, save and return - df = self.read_result(tmt_labeled) + df = self.read_result(tmt_labeled, custom_stat_mods, custom_var_mods) csv.write_file(df, out_path) return df