Skip to content

Commit

Permalink
Merge pull request #152 from wilhelm-lab/chore/koina_refactoring
Browse files Browse the repository at this point in the history
Chore/koina refactoring
  • Loading branch information
picciama authored Nov 9, 2023
2 parents da70c7b + a0f8a15 commit c85c0e4
Show file tree
Hide file tree
Showing 9 changed files with 550 additions and 175 deletions.
4 changes: 1 addition & 3 deletions docs/API.rst
Original file line number Diff line number Diff line change
Expand Up @@ -65,16 +65,14 @@ Koina interface
.. autosummary::
:toctree: api/pr

pr.grpc_predict
pr.infer_predictions
pr.predict

Postprocessing koina response
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

.. autosummary::
:toctree: api/pr


pr.parse_fragment_labels


Expand Down
3 changes: 1 addition & 2 deletions oktoberfest/data/spectra.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ class Spectra:
INTENSITY_COLUMN_PREFIX = "INTENSITY_RAW"
INTENSITY_PRED_PREFIX = "INTENSITY_PRED"
MZ_COLUMN_PREFIX = "MZ_RAW"
EPSILON = 1e-7
COLUMNS_FRAGMENT_ION = ["Y1+", "Y1++", "Y1+++", "B1+", "B1++", "B1+++"]

spectra_data: pd.DataFrame
Expand Down Expand Up @@ -129,7 +128,7 @@ def add_matrix(self, intensity_data: pd.Series, fragment_type: FragmentType) ->

# Change zeros to epislon to keep the info of invalid values
# change the -1 values to 0 (for better performance when converted to sparse representation)
intensity_array[intensity_array == 0] = Spectra.EPSILON
intensity_array[intensity_array == 0] = c.EPSILON
intensity_array[intensity_array == -1] = 0.0

# generate column names and build dataframe from sparse matrix
Expand Down
1 change: 1 addition & 0 deletions oktoberfest/predict/__init__.py
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
"""Init predict."""
from .koina import Koina
from .predict import *
455 changes: 455 additions & 0 deletions oktoberfest/predict/koina.py

Large diffs are not rendered by default.

178 changes: 38 additions & 140 deletions oktoberfest/predict/predict.py
Original file line number Diff line number Diff line change
@@ -1,164 +1,61 @@
import logging
import re
from math import ceil
from multiprocessing import current_process
from typing import Dict, List, Tuple
from typing import Dict

import numpy as np
import pandas as pd
from spectrum_fundamentals.metrics.similarity import SimilarityMetrics
from tqdm.auto import tqdm
from tritonclient.grpc import InferenceServerClient, InferInput, InferRequestedOutput

from ..data.spectra import FragmentType, Spectra
from .koina import Koina

logger = logging.getLogger(__name__)


def grpc_predict(
library: Spectra,
url: str,
intensity_model: str,
irt_model: str,
ssl: bool = True,
alignment: bool = False,
job_type: str = "",
):
def predict(data: pd.DataFrame, *args, **kwargs) -> Dict[str, np.ndarray]:
"""
Use grpc to predict library and add predictions to library.
Retrieve predictions from koina.
:param library: Spectra object with the library
:param url: Url including the port of the prediction server
:param intensity_model: the name of the intensity model on the server
:param irt_model: the name of the irt model on the server
:param ssl: whether or not the server requires an ssl encrypted transportation, default = True
:param alignment: True if alignment present
:param job_type: TODO
:return: grpc predictions if we are trying to generate spectral library
"""
triton_client = InferenceServerClient(url=url, ssl=ssl)
batch_size = 1000

intensity_outputs = ["intensities", "mz", "annotation"]
intensity_input_data = {
"peptide_sequences": (
library.spectra_data["MODIFIED_SEQUENCE"].to_numpy().reshape(-1, 1).astype(np.object_),
"BYTES",
),
"collision_energies": (
library.spectra_data["COLLISION_ENERGY"].to_numpy().reshape(-1, 1).astype(np.float32),
"FP32",
),
"precursor_charges": (
library.spectra_data["PRECURSOR_CHARGE"].to_numpy().reshape(-1, 1).astype(np.int32),
"INT32",
),
}
if "tmt" in intensity_model.lower() or "ptm" in intensity_model.lower():
intensity_input_data["fragmentation_types"] = (
library.spectra_data["FRAGMENTATION"].to_numpy().reshape(-1, 1).astype(np.object_),
"BYTES",
)
This function takes a dataframe containing information about PSMS and predicts peptide
properties using a koina server. The configuration of koina is set using the kwargs.
See the koina predict function for details. TODO, link this properly.
intensity_predictions = infer_predictions(
triton_client,
model=intensity_model,
input_data=intensity_input_data,
outputs=intensity_outputs,
batch_size=batch_size,
)
intensity_predictions["intensities"][np.where(intensity_predictions["intensities"] < 1e-7)] = 0.0
:param data: Dataframe containing the data for the prediction.
:param args: Additional positional arguments forwarded to Koina::predict
:param kwargs: Additional keyword arguments forwarded to Koina::predict
irt_input_data = {"peptide_sequences": intensity_input_data["peptide_sequences"]}
irt_outputs = ["irt"]
irt_predictions = infer_predictions(
triton_client,
model=irt_model,
input_data=irt_input_data,
outputs=irt_outputs,
batch_size=batch_size,
:return: a dictionary with targets (keys) and predictions (values)
"""
predictor = Koina(*args, **kwargs)

data.rename(
columns={
"MODIFIED_SEQUENCE": "peptide_sequences",
"PRECURSOR_CHARGE": "precursor_charges",
"COLLISION_ENERGY": "collision_energies",
"FRAGMENTATION": "fragmentation_types",
},
inplace=True,
)

if job_type == "SpectralLibraryGeneration":
intensity_prediction_dict = {
"intensity": intensity_predictions["intensities"],
"fragmentmz": intensity_predictions["mz"],
"annotation": parse_fragment_labels(
intensity_predictions["annotation"],
library.spectra_data["PRECURSOR_CHARGE"].to_numpy()[:, None],
library.spectra_data["PEPTIDE_LENGTH"].to_numpy()[:, None],
),
}
output_dict = {intensity_model: intensity_prediction_dict, irt_model: irt_predictions["irt"]}
return output_dict

intensities_pred = pd.DataFrame()
intensities_pred["intensity"] = intensity_predictions["intensities"].tolist()
library.add_matrix(intensities_pred["intensity"], FragmentType.PRED)
results = predictor.predict(data)

if alignment:
return
data.rename(
columns={
"peptide_sequences": "MODIFIED_SEQUENCE",
"precursor_charges": "PRECURSOR_CHARGE",
"collision_energies": "COLLISION_ENERGY",
"fragmentation_types": "FRAGMENTATION",
},
inplace=True,
)

library.add_column(irt_predictions["irt"], name="PREDICTED_IRT")
return results


def infer_predictions(
triton_client: InferenceServerClient,
model: str,
input_data: Dict[str, Tuple[np.ndarray, str]],
outputs: List[str],
batch_size: int,
def parse_fragment_labels(
spectra_labels: np.ndarray, precursor_charges: np.ndarray, seq_lengths: np.ndarray
) -> Dict[str, np.ndarray]:
"""
Infer predictions from a triton client.
:param triton_client: An inference client using grpc
:param model: a model that is recognized by the server specified in the triton client
:param input_data: a dictionary that contains the input names (key) for the specific model
and a tuple of the input_data as a numpy array of shape [:, 1] and the dtype recognized
by the triton client (value).
:param outputs: a list of output names for the specific model
:param batch_size: the number of elements from the input_data that should be provided to the
triton client at once
:return: a dictionary containing the predictions (values) for the given outputs (keys)
"""
num_spec = len(input_data[list(input_data)[0]][0])
predictions: Dict[str, List[np.ndarray]] = {output: [] for output in outputs}

n_batches = ceil(num_spec / batch_size)
process_identity = current_process()._identity
if len(process_identity) > 0:
position = process_identity[0]
else:
position = 0

with tqdm(
total=n_batches,
position=position,
desc=f"Inferring predictions for {num_spec} spectra with batch site {batch_size}",
leave=True,
) as progress:
for i in range(0, n_batches):
progress.update(1)
# logger.info(f"Predicting batch {i+1}/{n_batches}.")
infer_inputs = []
for input_key, (data, dtype) in input_data.items():
batch_data = data[i * batch_size : (i + 1) * batch_size]
infer_input = InferInput(input_key, batch_data.shape, dtype)
infer_input.set_data_from_numpy(batch_data)
infer_inputs.append(infer_input)

infer_outputs = [InferRequestedOutput(output) for output in outputs]

prediction = triton_client.infer(model, inputs=infer_inputs, outputs=infer_outputs)

for output in outputs:
predictions[output].append(prediction.as_numpy(output))

return {key: np.vstack(value) for key, value in predictions.items()}


def parse_fragment_labels(spectra_labels: np.ndarray, precursor_charges: np.ndarray, seq_lengths: np.ndarray):
"""Uses regex to parse labels."""
pattern = rb"([y|b])([0-9]{1,2})\+([1-3])"
fragment_types = []
Expand Down Expand Up @@ -231,11 +128,12 @@ def ce_calibration(library: Spectra, **server_kwargs) -> pd.Series:
between predicted and observed intensities before returning the alignment library.
:param library: spectral library to perform CE calibration on
:param server_kwargs: Additional parameters that are forwarded to grpc_predict
:param server_kwargs: Additional parameters that are forwarded to the prediction method
:return: pandas series containing the spectral angle for all tested collision energies
"""
alignment_library = _prepare_alignment_df(library)
grpc_predict(alignment_library, alignment=True, **server_kwargs)
intensities = predict(alignment_library.spectra_data, **server_kwargs)
alignment_library.add_matrix(pd.Series(intensities["intensities"].tolist(), name="intensities"), FragmentType.PRED)
_alignment(alignment_library)
return alignment_library

Expand Down
45 changes: 31 additions & 14 deletions oktoberfest/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from pathlib import Path
from typing import List, Type, Union

import pandas as pd
from spectrum_io.spectral_library import MSP, DLib, SpectralLibrary, Spectronaut

from oktoberfest import __copyright__, __version__
Expand All @@ -12,7 +13,7 @@
from oktoberfest import preprocessing as pp
from oktoberfest import rescore as re

from .data.spectra import Spectra
from .data.spectra import FragmentType, Spectra
from .utils import Config, JobPool, ProcessStep

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -90,10 +91,9 @@ def _get_best_ce(library: Spectra, spectra_file: Path, config: Config) -> int:
results_dir.mkdir(exist_ok=True)
if (library.spectra_data["FRAGMENTATION"] == "HCD").any():
server_kwargs = {
"url": config.prediction_server,
"server_url": config.prediction_server,
"ssl": config.ssl,
"intensity_model": config.models["intensity"],
"irt_model": config.models["irt"],
"model_name": config.models["intensity"],
}
alignment_library = pr.ce_calibration(library, **server_kwargs)
ce_alignment = alignment_library.spectra_data.groupby(by=["COLLISION_ENERGY"])["SPECTRAL_ANGLE"].mean()
Expand Down Expand Up @@ -154,11 +154,8 @@ def generate_spectral_lib(config_path: Union[str, Path]):
no_of_sections = no_of_spectra // 7000

server_kwargs = {
"url": config.prediction_server,
"server_url": config.prediction_server,
"ssl": config.ssl,
"intensity_model": config.models["intensity"],
"irt_model": config.models["irt"],
"job_type": "SpectralLibraryGeneration",
}

spectral_library: Type[SpectralLibrary]
Expand Down Expand Up @@ -192,9 +189,21 @@ def generate_spectral_lib(config_path: Union[str, Path]):
else:
break

grpc_output_sec = pr.grpc_predict(spectra_div, **server_kwargs)
pred_intensities = pr.predict(spectra_div.spectra_data, model_name=config.models["intensity"], **server_kwargs)
pred_irts = pr.predict(spectra_div.spectra_data, model_name=config.models["irt"], **server_kwargs)

intensity_prediction_dict = {
"intensity": pred_intensities["intensities"],
"fragmentmz": pred_intensities["mz"],
"annotation": pr.parse_fragment_labels(
pred_intensities["annotation"],
spectra_div.spectra_data["PRECURSOR_CHARGE"].to_numpy()[:, None],
spectra_div.spectra_data["PEPTIDE_LENGTH"].to_numpy()[:, None],
),
}
output_dict = {config.models["intensity"]: intensity_prediction_dict, config.models["irt"]: pred_irts["irt"]}

out_lib = spectral_library(spectra_div.spectra_data, grpc_output_sec, out_file)
out_lib = spectral_library(spectra_div.spectra_data, output_dict, out_file)
out_lib.prepare_spectrum()
out_lib.write()

Expand Down Expand Up @@ -255,13 +264,21 @@ def _calculate_features(spectra_file: Path, config: Config):
return

server_kwargs = {
"url": config.prediction_server,
"server_url": config.prediction_server,
"ssl": config.ssl,
"intensity_model": config.models["intensity"],
"irt_model": config.models["irt"],
}

pr.grpc_predict(library, **server_kwargs)
pred_intensities = pr.predict(
library.spectra_data,
model_name=config.models["intensity"],
targets=["intensities", "annotation"],
**server_kwargs,
)
pred_irts = pr.predict(library.spectra_data, model_name=config.models["irt"], **server_kwargs)

library.add_matrix(pd.Series(pred_intensities["intensities"].tolist(), name="intensities"), FragmentType.PRED)
library.add_column(pred_irts["irt"], name="PREDICTED_IRT")

library.write_pred_as_hdf5(config.output / "data" / spectra_file.with_suffix(".mzml.pred.hdf5").name)

# produce percolator tab files
Expand Down
8 changes: 4 additions & 4 deletions tests/unit_tests/data/predictions/library_input.csv
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
,"MODIFIED_SEQUENCE","COLLISION_ENERGY","PRECURSOR_CHARGE","FRAGMENTATION"
0,"[UNIMOD:737]-PEPTIDEK[UNIMOD:737]",30,2,"HCD"
1,"[UNIMOD:737]-PEPTIDE",30,2,"HCD"
2,"[UNIMOD:737]-M[UNIMOD:35]EC[UNIMOD:4]TIDEK[UNIMOD:737]",35,1,"CID"
"MODIFIED_SEQUENCE","COLLISION_ENERGY","PRECURSOR_CHARGE","FRAGMENTATION"
"[UNIMOD:737]-PEPTIDEK[UNIMOD:737]",30,2,"HCD"
"[UNIMOD:737]-PEPTIDE",30,2,"HCD"
"[UNIMOD:737]-M[UNIMOD:35]EC[UNIMOD:4]TIDEK[UNIMOD:737]",35,1,"CID"
Loading

0 comments on commit c85c0e4

Please sign in to comment.