diff --git a/oktoberfest/jobs/ce_calibration.py b/oktoberfest/jobs/ce_calibration.py new file mode 100644 index 00000000..ec80aba3 --- /dev/null +++ b/oktoberfest/jobs/ce_calibration.py @@ -0,0 +1,229 @@ +import logging +from pathlib import Path +from typing import List, Optional, Union + +import numpy as np +from sklearn.linear_model import LinearRegression, RANSACRegressor + +from oktoberfest import __copyright__, __version__ +from oktoberfest import plotting as pl +from oktoberfest import predict as pr +from oktoberfest import preprocessing as pp +from oktoberfest import rescore as re + +from ..data.spectra import Spectra +from ..utils import Config, JobPool, ProcessStep, group_iterator + +logger = logging.getLogger(__name__) + + +def preprocess(spectra_files: List[Path], config: Config) -> List[Path]: + preprocess_search_step = ProcessStep(config.output, "preprocessing_search") + if not preprocess_search_step.is_done(): + # load search results + if not config.search_results_type == "internal": + logger.info(f"Converting search results from {config.search_results} to internal search result.") + + msms_output = config.output / "msms" + msms_output.mkdir(exist_ok=True) + internal_search_file = msms_output / "msms.prosit" + tmt_label = config.tag + + search_results = pp.convert_search( + input_path=config.search_results, + search_engine=config.search_results_type, + tmt_label=tmt_label, + output_file=internal_search_file, + ) + if config.spectra_type.lower() in ["d", "hdf"]: + timstof_metadata = pp.convert_timstof_metadata( + input_path=config.search_results, + search_engine=config.search_results_type, + output_file=msms_output / "tims_meta.csv", + ) + else: + internal_search_file = config.search_results + search_results = pp.load_search(internal_search_file) + # TODO add support for internal timstof metadata + logger.info(f"Read {len(search_results)} PSMs from {internal_search_file}") + + # filter search results + search_results = pp.filter_peptides_for_model(peptides=search_results, model=config.models["intensity"]) + + # split search results + searchfiles_found = pp.split_search( + search_results=search_results, + output_dir=config.output / "msms", + filenames=[spectra_file.stem for spectra_file in spectra_files], + ) + # split timstof metadata + if config.spectra_type.lower() in ["d", "hdf"]: + _ = pp.split_timstof_metadata( + timstof_metadata=timstof_metadata, + output_dir=config.output / "msms", + filenames=searchfiles_found, + ) + preprocess_search_step.mark_done() + else: + searchfiles_found = [msms_file.stem for msms_file in (config.output / "msms").glob("*rescore")] + spectra_files_to_return = [] + for spectra_file in spectra_files: + if spectra_file.stem in searchfiles_found: + spectra_files_to_return.append(spectra_file) + + return spectra_files_to_return + + +def _annotate_and_get_library(spectra_file: Path, config: Config, tims_meta_file: Optional[Path] = None) -> Spectra: + data_dir = config.output / "data" + data_dir.mkdir(exist_ok=True) + hdf5_path = data_dir / spectra_file.with_suffix(".mzml.hdf5").name + if hdf5_path.is_file(): + aspec = Spectra.from_hdf5(hdf5_path) + instrument_type = config.instrument_type + if instrument_type is not None and aspec.obs["INSTRUMENT_TYPES"].values[0] != instrument_type: + aspec.obs["INSTRUMENT_TYPES"] = instrument_type + aspec.write_as_hdf5(hdf5_path) + else: + spectra_dir = config.output / "spectra" + spectra_dir.mkdir(exist_ok=True) + format_ = spectra_file.suffix.lower() + if format_ == ".raw": + file_to_load = spectra_dir / spectra_file.with_suffix(".mzML").name + pp.convert_raw_to_mzml(spectra_file, file_to_load, thermo_exe=config.thermo_exe) + elif format_ in [".mzml", ".hdf"]: + file_to_load = spectra_file + elif format_ == ".d": + file_to_load = spectra_dir / spectra_file.with_suffix(".hdf").name + pp.convert_d_to_hdf(spectra_file, file_to_load) + spectra = pp.load_spectra(file_to_load, tims_meta_file=tims_meta_file) + config_instrument_type = config.instrument_type + if config_instrument_type is not None: + spectra["INSTRUMENT_TYPES"] = config_instrument_type + search = pp.load_search(config.output / "msms" / spectra_file.with_suffix(".rescore").name) + library = pp.merge_spectra_and_peptides(spectra, search) + aspec = pp.annotate_spectral_library( + library, mass_tol=config.mass_tolerance, unit_mass_tol=config.unit_mass_tolerance + ) + aspec.write_as_hdf5(hdf5_path) # write_metadata_annotation + + return aspec + + +def _get_best_ce(library: Spectra, spectra_file: Path, config: Config): + results_dir = config.output / "results" + results_dir.mkdir(exist_ok=True) + if (library.obs["FRAGMENTATION"] == "HCD").any(): + server_kwargs = { + "server_url": config.prediction_server, + "ssl": config.ssl, + "model_name": config.models["intensity"], + } + use_ransac_model = config.use_ransac_model + alignment_library = pr.ce_calibration(library, config.ce_range, use_ransac_model, **server_kwargs) + + if use_ransac_model: + logger.info("Performing RANSAC regression") + calib_group = ( + alignment_library.obs.groupby( + by=["PRECURSOR_CHARGE", "ORIG_COLLISION_ENERGY", "COLLISION_ENERGY", "MASS"], as_index=False + )["SPECTRAL_ANGLE"] + .mean() + .groupby(["PRECURSOR_CHARGE", "ORIG_COLLISION_ENERGY", "MASS"], as_index=False) + .apply(lambda x: x.loc[x["SPECTRAL_ANGLE"].idxmax()]) + ) + calib_group["delta_collision_energy"] = ( + calib_group["COLLISION_ENERGY"] - calib_group["ORIG_COLLISION_ENERGY"] + ) + x = calib_group[["MASS", "PRECURSOR_CHARGE"]] # input feature + y = calib_group["delta_collision_energy"] # target variable + ransac = RANSACRegressor(LinearRegression(), residual_threshold=1.5, random_state=42) + ransac.fit(x, y) + + for charge, df in calib_group.groupby("PRECURSOR_CHARGE"): + r2_score = ransac.score(df[["MASS", "PRECURSOR_CHARGE"]], df["COLLISION_ENERGY"]) + title = f"Scatter Plot with RANSAC Model \nSlope: {ransac.estimator_.coef_[0]:.2f}, " + title += f"Intercept: {ransac.estimator_.intercept_:.2f}, R2: {r2_score:.2f}" + pl.plot_ce_ransac_model( + sa_ce_df=df, + filename=results_dir / f"{spectra_file.stem}_ce_ransac_model_{charge}.svg", + title=title, + ) + + delta_ce = ransac.predict(library.obs[["MASS", "PRECURSOR_CHARGE"]]) + library.obs["COLLISION_ENERGY"] = np.maximum(0, library.obs["COLLISION_ENERGY"] + delta_ce) + + else: + ce_alignment = alignment_library.obs.groupby(by=["COLLISION_ENERGY"])["SPECTRAL_ANGLE"].mean() + + best_ce = ce_alignment.idxmax() + pl.plot_mean_sa_ce( + sa_ce_df=ce_alignment.to_frame().reset_index(), + filename=results_dir / f"{spectra_file.stem}_mean_spectral_angle_ce.svg", + ) + pl.plot_violin_sa_ce( + sa_ce_df=alignment_library.obs[["COLLISION_ENERGY", "SPECTRAL_ANGLE"]], + filename=results_dir / f"{spectra_file.stem}_violin_spectral_angle_ce.svg", + ) + library.obs["COLLISION_ENERGY"] = best_ce + with open(results_dir / f"{spectra_file.stem}_ce.txt", "w") as f: + f.write(str(best_ce)) + f.close() + else: + best_ce = 35 + library.obs["COLLISION_ENERGY"] = best_ce + + with open(results_dir / f"{spectra_file.stem}_ce.txt", "w") as f: + f.write(str(best_ce)) + f.close()\ + + +def ce_calib(spectra_file: Path, config: Config) -> Spectra: + ce_calib_step = ProcessStep(config.output, "ce_calib." + spectra_file.stem) + if ce_calib_step.is_done(): + hdf5_path = config.output / "data" / spectra_file.with_suffix(".mzml.hdf5").name + if hdf5_path.is_file(): + library = Spectra.from_hdf5(hdf5_path) + return library + else: + raise FileNotFoundError(f"{hdf5_path} not found but ce_calib.{spectra_file.stem} found. Please check.") + tims_meta_file = None + if config.spectra_type.lower() in ["hdf", "d"]: # if it is timstof + tims_meta_file = config.output / "msms" / spectra_file.with_suffix(".timsmeta").name + aspec = _annotate_and_get_library(spectra_file, config, tims_meta_file=tims_meta_file) + _get_best_ce(aspec, spectra_file, config) + + aspec.write_as_hdf5(config.output / "data" / spectra_file.with_suffix(".mzml.hdf5").name) + + ce_calib_step.mark_done() + + return aspec + +def run_ce_calibration( + config_path: Union[str, Path], +): + """ + Create a CeCalibration object and run the CE calibration. + + # TODO full description + :param config_path: path to config file + """ + config = Config() + config.read(config_path) + + # load spectra file names + spectra_files = pp.list_spectra(input_dir=config.spectra, input_format=config.spectra_type) + + proc_dir = config.output / "proc" + proc_dir.mkdir(parents=True, exist_ok=True) + + spectra_files = preprocess(spectra_files, config) + + if config.num_threads > 1: + processing_pool = JobPool(processes=config.num_threads) + for spectra_file in spectra_files: + processing_pool.apply_async(ce_calib, [spectra_file, config]) + processing_pool.check_pool() + else: + for spectra_file in spectra_files: + ce_calib(spectra_file, config) \ No newline at end of file diff --git a/oktoberfest/jobs/rescoring.py b/oktoberfest/jobs/rescoring.py new file mode 100644 index 00000000..227c42ed --- /dev/null +++ b/oktoberfest/jobs/rescoring.py @@ -0,0 +1,152 @@ +import logging +from pathlib import Path +from typing import Union + +from oktoberfest import __copyright__, __version__ +from oktoberfest import plotting as pl +from oktoberfest import predict as pr +from oktoberfest import preprocessing as pp +from oktoberfest import rescore as re +from oktoberfest.jobs import ce_calibration as ce + +from ..utils import Config, JobPool, ProcessStep, group_iterator + +logger = logging.getLogger(__name__) + + +def _calculate_features(spectra_file: Path, config: Config): + library = ce.ce_calib(spectra_file, config) + + calc_feature_step = ProcessStep(config.output, "calculate_features." + spectra_file.stem) + if calc_feature_step.is_done(): + return + + predict_step = ProcessStep(config.output, "predict." + spectra_file.stem) + if not predict_step.is_done(): + + predict_kwargs = { + "server_url": config.prediction_server, + "ssl": config.ssl, + } + + if "alphapept" in config.models["intensity"].lower(): + chunk_idx = list(group_iterator(df=library.obs, group_by_column="PEPTIDE_LENGTH")) + else: + chunk_idx = None + pr.predict_intensities( + data=library, chunk_idx=chunk_idx, model_name=config.models["intensity"], **predict_kwargs + ) + + pr.predict_rt(data=library, model_name=config.models["irt"], **predict_kwargs) + + library.write_as_hdf5(config.output / "data" / spectra_file.with_suffix(".mzml.hdf5").name) + predict_step.mark_done() + + # produce percolator tab files + fdr_dir = config.output / "results" / config.fdr_estimation_method + fdr_dir.mkdir(exist_ok=True) + + re.generate_features( + library=library, + search_type="original", + output_file=fdr_dir / spectra_file.with_suffix(".original.tab").name, + all_features=config.all_features, + regression_method=config.curve_fitting_method, + ) + re.generate_features( + library=library, + search_type="rescore", + output_file=fdr_dir / spectra_file.with_suffix(".rescore.tab").name, + all_features=config.all_features, + regression_method=config.curve_fitting_method, + ) + + calc_feature_step.mark_done() + + +def _rescore(fdr_dir: Path, config: Config): + """ + High level rescore function for original and rescore. + + :param fdr_dir: the output directory + :param config: the configuration object + :raises ValueError: if the provided fdr estimation method in the config is not recognized + """ + rescore_original_step = ProcessStep(config.output, f"{config.fdr_estimation_method}_original") + rescore_prosit_step = ProcessStep(config.output, f"{config.fdr_estimation_method}_prosit") + + if config.fdr_estimation_method == "percolator": + if not rescore_original_step.is_done(): + re.rescore_with_percolator(input_file=fdr_dir / "original.tab", output_folder=fdr_dir) + rescore_original_step.mark_done() + if not rescore_prosit_step.is_done(): + re.rescore_with_percolator(input_file=fdr_dir / "rescore.tab", output_folder=fdr_dir) + rescore_prosit_step.mark_done() + elif config.fdr_estimation_method == "mokapot": + if not rescore_original_step.is_done(): + re.rescore_with_mokapot(input_file=fdr_dir / "original.tab", output_folder=fdr_dir) + rescore_original_step.mark_done() + if not rescore_prosit_step.is_done(): + re.rescore_with_mokapot(input_file=fdr_dir / "rescore.tab", output_folder=fdr_dir) + rescore_prosit_step.mark_done() + else: + raise ValueError( + 'f{config.fdr_estimation_method} is not a valid rescoring tool, use either "percolator" or "mokapot"' + ) + + +def run_rescoring(config_path: Union[str, Path]): + """ + Create a ReScore object and run the rescoring. + + # TODO full description + :param config_path: path to config file + """ + config = Config() + config.read(config_path) + + # load spectra file names + spectra_files = pp.list_spectra(input_dir=config.spectra, input_format=config.spectra_type) + + proc_dir = config.output / "proc" + proc_dir.mkdir(parents=True, exist_ok=True) + + spectra_files = ce.preprocess(spectra_files, config) + + if config.num_threads > 1: + processing_pool = JobPool(processes=config.num_threads) + for spectra_file in spectra_files: + processing_pool.apply_async(_calculate_features, [spectra_file, config]) + processing_pool.check_pool() + else: + for spectra_file in spectra_files: + _calculate_features(spectra_file, config) + + # prepare rescoring + + fdr_dir = config.output / "results" / config.fdr_estimation_method + + original_tab_files = [fdr_dir / spectra_file.with_suffix(".original.tab").name for spectra_file in spectra_files] + rescore_tab_files = [fdr_dir / spectra_file.with_suffix(".rescore.tab").name for spectra_file in spectra_files] + + prepare_tab_original_step = ProcessStep(config.output, f"{config.fdr_estimation_method}_prepare_tab_original") + prepare_tab_rescore_step = ProcessStep(config.output, f"{config.fdr_estimation_method}_prepare_tab_prosit") + + if not prepare_tab_original_step.is_done(): + logger.info("Merging input tab files for rescoring without peptide property prediction") + re.merge_input(tab_files=original_tab_files, output_file=fdr_dir / "original.tab") + prepare_tab_original_step.mark_done() + + if not prepare_tab_rescore_step.is_done(): + logger.info("Merging input tab files for rescoring with peptide property prediction") + re.merge_input(tab_files=rescore_tab_files, output_file=fdr_dir / "rescore.tab") + prepare_tab_rescore_step.mark_done() + + # rescoring + _rescore(fdr_dir, config) + + # plotting + logger.info("Generating summary plots...") + pl.plot_all(fdr_dir) + + logger.info("Finished rescoring.") \ No newline at end of file diff --git a/oktoberfest/jobs/spectral_library.py b/oktoberfest/jobs/spectral_library.py new file mode 100644 index 00000000..f780496c --- /dev/null +++ b/oktoberfest/jobs/spectral_library.py @@ -0,0 +1,265 @@ +import logging +import pickle +import sys +import time +from functools import partial +from math import ceil +from multiprocessing import Manager, Process, pool +from pathlib import Path +from typing import Dict, List, Tuple, Type, Union + +import pandas as pd +from spectrum_io.spectral_library import MSP, DLib, SpectralLibrary, Spectronaut +from tqdm.auto import tqdm + +from oktoberfest import __copyright__, __version__ +from oktoberfest import predict as pr +from oktoberfest import preprocessing as pp + +from ..data.spectra import Spectra +from ..utils import Config, ProcessStep, group_iterator + +logger = logging.getLogger(__name__) + + +def _make_predictions_error_callback(failure_progress_tracker, failure_lock, error): + logger.error( + f"Prediction failed due to: {error} Batch will be missing from output file. " + "DO NOT STOP THIS RUN: The index of the batch is stored and your output file will be appended " + "by the missing batches if you rerun without changing your config file after this run is completed." + ) + with failure_lock: + failure_progress_tracker.value += 1 + +def _make_predictions(int_model, irt_model, predict_kwargs, queue_out, progress, lock, batch_df): + predictions = { + **pr.predict_at_once(batch_df, model_name=int_model, **predict_kwargs), + **pr.predict_at_once(batch_df, model_name=irt_model, **predict_kwargs), + } + queue_out.put((predictions, batch_df)) + with lock: + progress.value += 1 + + +def _get_writer_and_output(results_path: Path, output_format: str) -> Tuple[Type[SpectralLibrary], Path]: + if output_format == "msp": + return MSP, results_path / "myPrositLib.msp" + elif output_format == "spectronaut": + return Spectronaut, results_path / "myPrositLib.csv" + elif output_format == "dlib": + return DLib, results_path / "myPrositLib.dlib" + else: + raise ValueError(f"{output_format} is not supported as spectral library type") + + +def _get_batches_and_mode(out_file: Path, failed_batch_file: Path, obs: pd.DataFrame, batchsize: int, model: str): + if out_file.is_file(): + if failed_batch_file.is_file(): + with open(failed_batch_file, "rb") as fh: + batch_iterator = pickle.load(fh) + mode = "a" + logger.warning( + f"Found existing spectral library {out_file}. " + "Attempting to append missing batches from previous run..." + ) + else: + logger.error( + f"A file {out_file} already exists but no information about missing batches " + "from a previous run could be found. Stopping to prevent corruption / data loss. " + "If this is intended, delete the file and rerun." + ) + sys.exit(1) + else: + if "alphapept" in model.lower(): + batch_iterator = group_iterator(df=obs, group_by_column="PEPTIDE_LENGTH", max_batch_size=batchsize) + else: + batch_iterator = ( + obs.index[i * batchsize : (i + 1) * batchsize].to_numpy() for i in range(ceil(len(obs) / batchsize)) + ) + mode = "w" + + return list(batch_iterator), mode + + +def _speclib_from_digestion(config: Config) -> Spectra: + library_input_type = config.library_input_type + if library_input_type == "fasta": + digest_step = ProcessStep(config.output, "speclib_digested") + library_file = config.output / "prosit_input.csv" + if not digest_step.is_done(): + peptide_dict = pp.digest( + fasta=config.library_input, + digestion=config.digestion, + missed_cleavages=config.missed_cleavages, + db=config.db, + enzyme=config.enzyme, + special_aas=config.special_aas, + min_length=config.min_length, + max_length=config.max_length, + ) + metadata = pp.generate_metadata( + peptides=list(peptide_dict.keys()), + collision_energy=config.collision_energy, + precursor_charge=config.precursor_charge, + fragmentation=config.fragmentation, + instrument_type=config.instrument_type, + proteins=list(peptide_dict.values()), + ) + library_file = config.output / "prosit_input.csv" + metadata.to_csv(library_file, sep=",", index=None) + digest_step.mark_done() + elif library_input_type == "peptides": + library_file = config.library_input + else: + raise ValueError(f'Library input type {library_input_type} not understood. Can only be "fasta" or "peptides".') + spec_library = pp.gen_lib(library_file) + + pp_and_filter_step = ProcessStep(config.output, "speclib_filtered") + + data_dir = config.output / "data" + if not pp_and_filter_step.is_done(): + data_dir.mkdir(exist_ok=True) + spec_library = pp.process_and_filter_spectra_data( + library=spec_library, model=config.models["intensity"], tmt_label=config.tag + ) + spec_library.write_as_hdf5(data_dir / f"{library_file.stem}_filtered.hdf5") + pp_and_filter_step.mark_done() + else: + spec_library = Spectra.from_hdf5(data_dir / f"{library_file.stem}_filtered.hdf5") + + return spec_library + + +def _update(pbar: tqdm, postfix_values: Dict[str, int]): + total_val = sum(postfix_values.values()) + if total_val > pbar.n: + pbar.set_postfix(**postfix_values) + pbar.n = total_val + pbar.refresh() + + +def _check_write_failed_batch_file(failed_batch_file: Path, n_failed: int, results: List[pool.AsyncResult]): + if n_failed > 0: + failed_batches = [] + for i, result in enumerate(results): + try: + result.get() + except Exception: + failed_batches.append(i) + logger.error( + f"Prediction for {n_failed} / {i+1} batches failed. Check the log to find out why. " + "Then rerun without changing the config file to append only the missing batches to your output file." + ) + with open(failed_batch_file, "wb") as fh: + pickle.dump(failed_batches, fh) + sys.exit(1) + + +def generate_spectral_lib(config_path: Union[str, Path]): + """ + Create a SpectralLibrary object and generate the spectral library. + + # TODO full description + :param config_path: path to config file + """ + config = Config() + config.read(config_path) + + spec_library = _speclib_from_digestion(config) + + server_kwargs = { + "server_url": config.prediction_server, + "ssl": config.ssl, + "disable_progress_bar": True, + } + + speclib_written_step = ProcessStep(config.output, "speclib_written") + if not speclib_written_step.is_done(): + results_path = config.output / "results" + results_path.mkdir(exist_ok=True) + + batchsize = config.batch_size + failed_batch_file = config.output / "data" / "speclib_failed_batches.pkl" + writer, out_file = _get_writer_and_output(results_path, config.output_format) + batches, mode = _get_batches_and_mode( + out_file, failed_batch_file, spec_library.obs, batchsize, config.models["intensity"] + ) + speclib = writer(out_file, mode=mode, min_intensity_threshold=config.min_intensity) + n_batches = len(batches) + + with Manager() as manager: + # setup + shared_queue = manager.Queue(maxsize=config.num_threads) + prediction_progress = manager.Value("i", 0) + prediction_failure_progress = manager.Value("i", 0) + writing_progress = manager.Value("i", 0) + + lock = manager.Lock() + lock_failure = manager.Lock() + + # Create a pool for producer processes + predictor_pool = pool.Pool(config.num_threads) + + consumer_process = Process( + target=speclib.async_write, + args=( + shared_queue, + writing_progress, + ), + ) + + try: + results = [] + for batch in batches: + result = predictor_pool.apply_async( + _make_predictions, + ( + config.models["intensity"], + config.models["irt"], + server_kwargs, + shared_queue, + prediction_progress, + lock, + spec_library.obs.loc[batch], + ), + error_callback=partial( + _make_predictions_error_callback, prediction_failure_progress, lock_failure + ), + ) + results.append(result) + predictor_pool.close() + + with tqdm( + total=n_batches, desc="Writing library", postfix={"successful": 0, "missing": 0} + ) as writer_pbar: + consumer_process.start() + with tqdm( + total=n_batches, desc="Getting predictions", postfix={"successful": 0, "failed": 0} + ) as predictor_pbar: + while predictor_pbar.n < n_batches: + time.sleep(1) + pr_fail_val = prediction_failure_progress.value + _update(predictor_pbar, {"failed": pr_fail_val, "successful": prediction_progress.value}) + _update(writer_pbar, {"successful": writing_progress.value, "missing": pr_fail_val}) + shared_queue.put(None) # signal the writer process, that it is done + predictor_pool.join() # properly await and terminate the pool + + while writer_pbar.n < n_batches: # have to keep updating the writer pbar + time.sleep(1) + _update( + writer_pbar, + {"successful": writing_progress.value, "missing": prediction_failure_progress.value}, + ) + consumer_process.join() # properly await the termination of the writer process + + _check_write_failed_batch_file(failed_batch_file, prediction_failure_progress.value, results) + + finally: + predictor_pool.terminate() + predictor_pool.join() + consumer_process.terminate() + consumer_process.join() + + logger.info("Finished writing the library to disk") + failed_batch_file.unlink(missing_ok=True) + speclib_written_step.mark_done() \ No newline at end of file