Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Chore/koina refactoring #152

Merged
merged 7 commits into from
Nov 9, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 1 addition & 3 deletions docs/API.rst
Original file line number Diff line number Diff line change
Expand Up @@ -65,16 +65,14 @@ Koina interface
.. autosummary::
:toctree: api/pr

pr.grpc_predict
pr.infer_predictions
pr.predict

Postprocessing koina response
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

.. autosummary::
:toctree: api/pr


pr.parse_fragment_labels


Expand Down
3 changes: 1 addition & 2 deletions oktoberfest/data/spectra.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ class Spectra:
INTENSITY_COLUMN_PREFIX = "INTENSITY_RAW"
INTENSITY_PRED_PREFIX = "INTENSITY_PRED"
MZ_COLUMN_PREFIX = "MZ_RAW"
EPSILON = 1e-7
COLUMNS_FRAGMENT_ION = ["Y1+", "Y1++", "Y1+++", "B1+", "B1++", "B1+++"]

spectra_data: pd.DataFrame
Expand Down Expand Up @@ -129,7 +128,7 @@ def add_matrix(self, intensity_data: pd.Series, fragment_type: FragmentType) ->

# Change zeros to epislon to keep the info of invalid values
# change the -1 values to 0 (for better performance when converted to sparse representation)
intensity_array[intensity_array == 0] = Spectra.EPSILON
intensity_array[intensity_array == 0] = c.EPSILON
intensity_array[intensity_array == -1] = 0.0

# generate column names and build dataframe from sparse matrix
Expand Down
1 change: 1 addition & 0 deletions oktoberfest/predict/__init__.py
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
"""Init predict."""
from .koina import Koina
from .predict import *
455 changes: 455 additions & 0 deletions oktoberfest/predict/koina.py

Large diffs are not rendered by default.

178 changes: 38 additions & 140 deletions oktoberfest/predict/predict.py
Original file line number Diff line number Diff line change
@@ -1,164 +1,61 @@
import logging
import re
from math import ceil
from multiprocessing import current_process
from typing import Dict, List, Tuple
from typing import Dict

import numpy as np
import pandas as pd
from spectrum_fundamentals.metrics.similarity import SimilarityMetrics
from tqdm.auto import tqdm
from tritonclient.grpc import InferenceServerClient, InferInput, InferRequestedOutput

from ..data.spectra import FragmentType, Spectra
from .koina import Koina

logger = logging.getLogger(__name__)


def grpc_predict(
library: Spectra,
url: str,
intensity_model: str,
irt_model: str,
ssl: bool = True,
alignment: bool = False,
job_type: str = "",
):
def predict(data: pd.DataFrame, *args, **kwargs) -> Dict[str, np.ndarray]:
"""
Use grpc to predict library and add predictions to library.
Retrieve predictions from koina.

:param library: Spectra object with the library
:param url: Url including the port of the prediction server
:param intensity_model: the name of the intensity model on the server
:param irt_model: the name of the irt model on the server
:param ssl: whether or not the server requires an ssl encrypted transportation, default = True
:param alignment: True if alignment present
:param job_type: TODO
:return: grpc predictions if we are trying to generate spectral library
"""
triton_client = InferenceServerClient(url=url, ssl=ssl)
batch_size = 1000

intensity_outputs = ["intensities", "mz", "annotation"]
intensity_input_data = {
"peptide_sequences": (
library.spectra_data["MODIFIED_SEQUENCE"].to_numpy().reshape(-1, 1).astype(np.object_),
"BYTES",
),
"collision_energies": (
library.spectra_data["COLLISION_ENERGY"].to_numpy().reshape(-1, 1).astype(np.float32),
"FP32",
),
"precursor_charges": (
library.spectra_data["PRECURSOR_CHARGE"].to_numpy().reshape(-1, 1).astype(np.int32),
"INT32",
),
}
if "tmt" in intensity_model.lower() or "ptm" in intensity_model.lower():
intensity_input_data["fragmentation_types"] = (
library.spectra_data["FRAGMENTATION"].to_numpy().reshape(-1, 1).astype(np.object_),
"BYTES",
)
This function takes a dataframe containing information about PSMS and predicts peptide
properties using a koina server. The configuration of koina is set using the kwargs.
See the koina predict function for details. TODO, link this properly.

intensity_predictions = infer_predictions(
triton_client,
model=intensity_model,
input_data=intensity_input_data,
outputs=intensity_outputs,
batch_size=batch_size,
)
intensity_predictions["intensities"][np.where(intensity_predictions["intensities"] < 1e-7)] = 0.0
:param data: Dataframe containing the data for the prediction.
:param args: Additional positional arguments forwarded to Koina::predict
:param kwargs: Additional keyword arguments forwarded to Koina::predict

irt_input_data = {"peptide_sequences": intensity_input_data["peptide_sequences"]}
irt_outputs = ["irt"]
irt_predictions = infer_predictions(
triton_client,
model=irt_model,
input_data=irt_input_data,
outputs=irt_outputs,
batch_size=batch_size,
:return: a dictionary with targets (keys) and predictions (values)
"""
predictor = Koina(*args, **kwargs)

data.rename(
columns={
"MODIFIED_SEQUENCE": "peptide_sequences",
"PRECURSOR_CHARGE": "precursor_charges",
"COLLISION_ENERGY": "collision_energies",
"FRAGMENTATION": "fragmentation_types",
},
inplace=True,
)

if job_type == "SpectralLibraryGeneration":
intensity_prediction_dict = {
"intensity": intensity_predictions["intensities"],
"fragmentmz": intensity_predictions["mz"],
"annotation": parse_fragment_labels(
intensity_predictions["annotation"],
library.spectra_data["PRECURSOR_CHARGE"].to_numpy()[:, None],
library.spectra_data["PEPTIDE_LENGTH"].to_numpy()[:, None],
),
}
output_dict = {intensity_model: intensity_prediction_dict, irt_model: irt_predictions["irt"]}
return output_dict

intensities_pred = pd.DataFrame()
intensities_pred["intensity"] = intensity_predictions["intensities"].tolist()
library.add_matrix(intensities_pred["intensity"], FragmentType.PRED)
results = predictor.predict(data)

if alignment:
return
data.rename(
columns={
"peptide_sequences": "MODIFIED_SEQUENCE",
"precursor_charges": "PRECURSOR_CHARGE",
"collision_energies": "COLLISION_ENERGY",
"fragmentation_types": "FRAGMENTATION",
},
inplace=True,
)

library.add_column(irt_predictions["irt"], name="PREDICTED_IRT")
return results


def infer_predictions(
triton_client: InferenceServerClient,
model: str,
input_data: Dict[str, Tuple[np.ndarray, str]],
outputs: List[str],
batch_size: int,
def parse_fragment_labels(
spectra_labels: np.ndarray, precursor_charges: np.ndarray, seq_lengths: np.ndarray
) -> Dict[str, np.ndarray]:
"""
Infer predictions from a triton client.

:param triton_client: An inference client using grpc
:param model: a model that is recognized by the server specified in the triton client
:param input_data: a dictionary that contains the input names (key) for the specific model
and a tuple of the input_data as a numpy array of shape [:, 1] and the dtype recognized
by the triton client (value).
:param outputs: a list of output names for the specific model
:param batch_size: the number of elements from the input_data that should be provided to the
triton client at once
:return: a dictionary containing the predictions (values) for the given outputs (keys)
"""
num_spec = len(input_data[list(input_data)[0]][0])
predictions: Dict[str, List[np.ndarray]] = {output: [] for output in outputs}

n_batches = ceil(num_spec / batch_size)
process_identity = current_process()._identity
if len(process_identity) > 0:
position = process_identity[0]
else:
position = 0

with tqdm(
total=n_batches,
position=position,
desc=f"Inferring predictions for {num_spec} spectra with batch site {batch_size}",
leave=True,
) as progress:
for i in range(0, n_batches):
progress.update(1)
# logger.info(f"Predicting batch {i+1}/{n_batches}.")
infer_inputs = []
for input_key, (data, dtype) in input_data.items():
batch_data = data[i * batch_size : (i + 1) * batch_size]
infer_input = InferInput(input_key, batch_data.shape, dtype)
infer_input.set_data_from_numpy(batch_data)
infer_inputs.append(infer_input)

infer_outputs = [InferRequestedOutput(output) for output in outputs]

prediction = triton_client.infer(model, inputs=infer_inputs, outputs=infer_outputs)

for output in outputs:
predictions[output].append(prediction.as_numpy(output))

return {key: np.vstack(value) for key, value in predictions.items()}


def parse_fragment_labels(spectra_labels: np.ndarray, precursor_charges: np.ndarray, seq_lengths: np.ndarray):
"""Uses regex to parse labels."""
pattern = rb"([y|b])([0-9]{1,2})\+([1-3])"
fragment_types = []
Expand Down Expand Up @@ -231,11 +128,12 @@
between predicted and observed intensities before returning the alignment library.

:param library: spectral library to perform CE calibration on
:param server_kwargs: Additional parameters that are forwarded to grpc_predict
:param server_kwargs: Additional parameters that are forwarded to the prediction method
:return: pandas series containing the spectral angle for all tested collision energies
"""
alignment_library = _prepare_alignment_df(library)
grpc_predict(alignment_library, alignment=True, **server_kwargs)
intensities = predict(alignment_library.spectra_data, **server_kwargs)
alignment_library.add_matrix(pd.Series(intensities["intensities"].tolist(), name="intensities"), FragmentType.PRED)

Check warning on line 136 in oktoberfest/predict/predict.py

View check run for this annotation

Codecov / codecov/patch

oktoberfest/predict/predict.py#L135-L136

Added lines #L135 - L136 were not covered by tests
_alignment(alignment_library)
return alignment_library

Expand Down
45 changes: 31 additions & 14 deletions oktoberfest/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from pathlib import Path
from typing import List, Type, Union

import pandas as pd
from spectrum_io.spectral_library import MSP, DLib, SpectralLibrary, Spectronaut

from oktoberfest import __copyright__, __version__
Expand All @@ -12,7 +13,7 @@
from oktoberfest import preprocessing as pp
from oktoberfest import rescore as re

from .data.spectra import Spectra
from .data.spectra import FragmentType, Spectra
from .utils import Config, JobPool, ProcessStep

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -90,10 +91,9 @@
results_dir.mkdir(exist_ok=True)
if (library.spectra_data["FRAGMENTATION"] == "HCD").any():
server_kwargs = {
"url": config.prediction_server,
"server_url": config.prediction_server,
"ssl": config.ssl,
"intensity_model": config.models["intensity"],
"irt_model": config.models["irt"],
"model_name": config.models["intensity"],
}
alignment_library = pr.ce_calibration(library, **server_kwargs)
ce_alignment = alignment_library.spectra_data.groupby(by=["COLLISION_ENERGY"])["SPECTRAL_ANGLE"].mean()
Expand Down Expand Up @@ -154,11 +154,8 @@
no_of_sections = no_of_spectra // 7000

server_kwargs = {
"url": config.prediction_server,
"server_url": config.prediction_server,
"ssl": config.ssl,
"intensity_model": config.models["intensity"],
"irt_model": config.models["irt"],
"job_type": "SpectralLibraryGeneration",
}

spectral_library: Type[SpectralLibrary]
Expand Down Expand Up @@ -192,9 +189,21 @@
else:
break

grpc_output_sec = pr.grpc_predict(spectra_div, **server_kwargs)
pred_intensities = pr.predict(spectra_div.spectra_data, model_name=config.models["intensity"], **server_kwargs)
pred_irts = pr.predict(spectra_div.spectra_data, model_name=config.models["irt"], **server_kwargs)

intensity_prediction_dict = {
"intensity": pred_intensities["intensities"],
"fragmentmz": pred_intensities["mz"],
"annotation": pr.parse_fragment_labels(
pred_intensities["annotation"],
spectra_div.spectra_data["PRECURSOR_CHARGE"].to_numpy()[:, None],
spectra_div.spectra_data["PEPTIDE_LENGTH"].to_numpy()[:, None],
),
}
output_dict = {config.models["intensity"]: intensity_prediction_dict, config.models["irt"]: pred_irts["irt"]}

out_lib = spectral_library(spectra_div.spectra_data, grpc_output_sec, out_file)
out_lib = spectral_library(spectra_div.spectra_data, output_dict, out_file)
out_lib.prepare_spectrum()
out_lib.write()

Expand Down Expand Up @@ -255,13 +264,21 @@
return

server_kwargs = {
"url": config.prediction_server,
"server_url": config.prediction_server,
"ssl": config.ssl,
"intensity_model": config.models["intensity"],
"irt_model": config.models["irt"],
}

pr.grpc_predict(library, **server_kwargs)
pred_intensities = pr.predict(

Check warning on line 271 in oktoberfest/runner.py

View check run for this annotation

Codecov / codecov/patch

oktoberfest/runner.py#L271

Added line #L271 was not covered by tests
library.spectra_data,
model_name=config.models["intensity"],
targets=["intensities", "annotation"],
**server_kwargs,
)
pred_irts = pr.predict(library.spectra_data, model_name=config.models["irt"], **server_kwargs)

Check warning on line 277 in oktoberfest/runner.py

View check run for this annotation

Codecov / codecov/patch

oktoberfest/runner.py#L277

Added line #L277 was not covered by tests

library.add_matrix(pd.Series(pred_intensities["intensities"].tolist(), name="intensities"), FragmentType.PRED)
library.add_column(pred_irts["irt"], name="PREDICTED_IRT")

Check warning on line 280 in oktoberfest/runner.py

View check run for this annotation

Codecov / codecov/patch

oktoberfest/runner.py#L279-L280

Added lines #L279 - L280 were not covered by tests

library.write_pred_as_hdf5(config.output / "data" / spectra_file.with_suffix(".mzml.pred.hdf5").name)

# produce percolator tab files
Expand Down
8 changes: 4 additions & 4 deletions tests/unit_tests/data/predictions/library_input.csv
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
,"MODIFIED_SEQUENCE","COLLISION_ENERGY","PRECURSOR_CHARGE","FRAGMENTATION"
0,"[UNIMOD:737]-PEPTIDEK[UNIMOD:737]",30,2,"HCD"
1,"[UNIMOD:737]-PEPTIDE",30,2,"HCD"
2,"[UNIMOD:737]-M[UNIMOD:35]EC[UNIMOD:4]TIDEK[UNIMOD:737]",35,1,"CID"
"MODIFIED_SEQUENCE","COLLISION_ENERGY","PRECURSOR_CHARGE","FRAGMENTATION"
"[UNIMOD:737]-PEPTIDEK[UNIMOD:737]",30,2,"HCD"
"[UNIMOD:737]-PEPTIDE",30,2,"HCD"
"[UNIMOD:737]-M[UNIMOD:35]EC[UNIMOD:4]TIDEK[UNIMOD:737]",35,1,"CID"
Loading
Loading