Skip to content

Commit

Permalink
temporary commit to check d_to_hdf function
Browse files Browse the repository at this point in the history
  • Loading branch information
picciama committed Jan 24, 2024
1 parent 5188c34 commit 4bd53b4
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 220 deletions.
2 changes: 1 addition & 1 deletion spectrum_io/d/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
"""Init raw."""
import logging

from .bruker import convert_d_pkl
from .bruker import convert_d_hdf, read_and_aggregate_timstof

logger = logging.getLogger(__name__)
233 changes: 14 additions & 219 deletions spectrum_io/d/bruker.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,197 +33,6 @@ def _sanitize_columns(df: pd.DataFrame) -> pd.DataFrame:
return df


def _chunk_merge(df1: pd.DataFrame, df2: pd.DataFrame, common_column: str, chunk_size: int = 100000) -> pd.DataFrame:
"""
Merge two DataFrames in chunks based on a common column.
Splits both input DataFrames into chunks of specified size and merges corresponding chunks
based on the provided common column. It then filters the merged chunks based on specific conditions
before concatenating them to obtain the final merged DataFrame.
:param df1: First pd.DataFrame to be merged.
:param df2: Second pd.DataFrame to be merged.
:param common_column: Name of the column on which the DataFrames will be merged.
:param chunk_size: Size of chunks used for splitting the DataFrames.
:return: Merged pd.DataFrame containing the results of merging the input DataFrames in chunks.
"""
""" # TODO condition first, then merge, instead of chunk merge, then filter, then assemble
SELECT * FROM
df1.join(df2) on df1.common_column == df2.common_column
AND df2.SCANNUMBEGIN <= df1.SCAN <= df.SCANNUMEND
WHERE df2.SCANNUMBEGIN <= df1.SCAN <= df.SCANNUMEND
"""

merged_chunks = []
# Split both DataFrames into chunks
chunks_df1 = [df1[i : i + chunk_size] for i in range(0, len(df1), chunk_size)]
chunks_df2 = [df2[i : i + chunk_size] for i in range(0, len(df2), chunk_size)]
# Iterate through the chunks and merge them
for chunk1 in chunks_df1:
for chunk2 in chunks_df2:
merged_chunk = pd.merge(chunk1, chunk2, on=common_column)
merged_chunk = merged_chunk[
(merged_chunk["SCANNUMBEGIN"] <= merged_chunk["SCAN"])
& (merged_chunk["SCAN"] <= merged_chunk["SCANNUMEND"])
]
merged_chunks.append(merged_chunk)
# if no_more_pairs:
# break
# Concatenate the merged chunks to get the final result
merged_df = pd.concat(merged_chunks, ignore_index=True)
return merged_df


def _load_maxquant_txt(maxquant_out_path: Path, file_name: str) -> Tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame]:
"""
Load information files from MaxQuant output.
This function searches for specific text files in the provided MaxQuant output folder,
filters entries based on the given file name, and returns the content of these files as
pd.DataFrames.
:param maxquant_out_path: Path to the MaxQuant output folder.
:param file_name: Raw file name used to filter entries in the loaded txt files.
:raises AssertionError: If any of the requested text files does not does not contain any entries
for the provided file name.
:return: Tuple containing pd.DataFrames containing the data from the loaded txt files
(msms.txt, accumulatedMsmsScans.txt, pasefMsmsScans.txt).
"""
df_msms = pd.read_csv(maxquant_out_path / "msms.txt", sep="\t")
df_precursors = pd.read_csv(maxquant_out_path / "accumulatedMsmsScans.txt", sep="\t")
df_pasef = pd.read_csv(maxquant_out_path / "pasefMsmsScans.txt", sep="\t")
for df in [df_msms, df_precursors, df_pasef]:
df = _sanitize_columns(df)
df_msms = df_msms[df_msms["RAW_FILE"] == file_name]
df_precursors = df_precursors[df_precursors["RAW_FILE"] == file_name]
df_pasef = df_pasef[df_pasef["RAW_FILE"] == file_name]
if df_msms.empty:
raise AssertionError(f"No entries for {file_name} found in msms.txt. " "Check the rawfile name.")
if df_precursors.empty:
raise AssertionError(
f"No entries for {file_name} found in accumulatedMsmsScans.txt. " "Check the rawfile name."
)
if df_pasef.empty:
raise AssertionError(f"No entries for {file_name} found in pasefMsmsScans.txt. " "Check the rawfile name.")
return df_msms, df_precursors, df_pasef


def _load_metadata(
out_path: Path, file_name: str, search_engine: str
) -> Tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame]:
"""
Load metadata based on the search engine used in MaxQuant output.
Depending on the specified search engine, this function loads metadata from the MaxQuant
output or raises an error if the search engine is not supported.
:param out_path: Path to the search engine output folder.
:param file_name: Raw file name used to filter entries in the loaded txt files.
:param search_engine: Search engine used for processing the data (e.g., 'maxquant', 'sage', 'msfragger').
:raises NotImplementedError: If the search engine is 'sage' or 'msfragger'.
:raises ValueError: If the search engine is not recognized.
:return: Tuple containing pd.DataFrames containing metadata based on the search engine used.
"""
if search_engine == "maxquant":
return _load_maxquant_txt(out_path, file_name)
if search_engine == "sage":
raise NotImplementedError("Oktoberfest does not yet support results from SAGE.")
if search_engine == "msfragger":
raise NotImplementedError("Oktoberfest does not yet support results from MSFragger.")
raise ValueError(f"{search_engine} is not recognized.")


def load_timstof(d_path: Path, out_path: Path, search_engine: str = "maxquant") -> pd.DataFrame:
"""
Load timsTOF data and merge with metadata from search engine output.
This function loads raw timsTOF data and merges it with metadata (precursor, frame, scan information)
obtained from the search engine output.
:param d_path: Path to the raw timsTOF data.
:param out_path: Path to the search engine output folder.
:param search_engine: Search engine used for processing the data ('maxquant', 'sage', 'msfragger').
:return: DataFrame containing combined information from the timsTOF data and search engine metadata.
"""
# Load the raw bruker data
logger.info("Loading data...")
data = alphatims.bruker.TimsTOF(str(d_path))
# Get the filename to ensure the correct data is mapped
file_name = d_path.stem
# Load the PSMs, precursor and frame information
df_msms, df_precursors, df_pasef = _load_metadata(out_path, file_name, search_engine)
df_precursors = df_precursors[df_precursors["SCAN_NUMBER"].isin(df_msms.SCAN_NUMBER)]
df_precursors["PRECURSOR"] = df_precursors["PASEF_PRECURSOR_IDS"].str.split(";")
df_precursors = df_precursors.explode("PRECURSOR", ignore_index=True).drop_duplicates()
df_precursors["PRECURSOR"] = df_precursors["PRECURSOR"].astype(int)
# Get which precursors are in which scan
scan_precursor_map = df_precursors[["SCAN_NUMBER", "PRECURSOR"]].drop_duplicates()
df_pasef = df_pasef[df_pasef["PRECURSOR"].isin(df_precursors.PRECURSOR)]
df_pasef = df_pasef.rename(columns={"COLLISIONENERGY": "COLLISION_ENERGY"})

# Get where each frame starts and ends
df_pasef = df_pasef[["PRECURSOR", "FRAME", "SCANNUMBEGIN", "SCANNUMEND", "COLLISION_ENERGY"]].drop_duplicates()

# Get the frames from the raw bruker data
data = data[
df_pasef.FRAME
] # TODO only read frames to begin within , do we need the duplicates or should we filter them out?
data = _sanitize_columns(data)
data = data.rename(columns={"MOBILITY": "INV_ION_MOBILITY", "RT": "RETENTION_TIME"})

data = data[["FRAME", "SCAN", "TOF", "INTENSITY", "MZ", "INV_ION_MOBILITY", "RETENTION_TIME"]]
# Map scan information to the raw bruker data
df_raw_mapped = _chunk_merge(df1=data, df2=df_pasef, common_column="FRAME") # TODO check frame is not frame_indices
df_raw_mapped = df_raw_mapped.drop_duplicates() # TODO check if necessary
# Combine the MZ and INTENSITY information on frame-level
df_raw_mapped_test = (
df_raw_mapped.groupby(["PRECURSOR", "FRAME"])
.agg(
{
"INTENSITY": list,
"MZ": list,
"RETENTION_TIME": "first",
"COLLISION_ENERGY": "first",
"INV_ION_MOBILITY": "first",
}
)
.reset_index()
)
# Combine the MZ and INTENSITY information on the summed scan-level
df_scans = (
df_raw_mapped_test.merge(scan_precursor_map) # TODO check if this is correct
.groupby("SCAN_NUMBER")
.agg(
median_CE=("COLLISION_ENERGY", "median"),
combined_INTENSITIES=("INTENSITY", lambda x: [item for sublist in x for item in sublist]),
combined_MZ=("MZ", lambda x: [item for sublist in x for item in sublist]),
median_RETENTION_TIME=("RETENTION_TIME", "median"),
median_INV_ION_MOBILITY=("INV_ION_MOBILITY", "median"),
)
.reset_index()
)

# TODO move this into the logic of oktoberfest, not need downstream
# Get the CHARGE, MASS_ANALYZER, RAW_FILE, and FRAGMENTATION from the msms.txt file
df_msms_scans = pd.merge(df_scans, df_msms, on="SCAN_NUMBER")
df_msms_scans = df_msms_scans[
[
"RAW_FILE",
"SCAN_NUMBER",
"combined_INTENSITIES",
"combined_MZ",
"MASS_ANALYZER",
"FRAGMENTATION",
"median_RETENTION_TIME",
"median_INV_ION_MOBILITY",
"median_CE",
"CHARGE",
]
]
logger.info("Done loading data.")
return df_msms_scans


def binning(inp: pd.DataFrame, ignore_charges: bool, rescoring_path: Path) -> pd.DataFrame:
"""
Perform binning on the input MasterSpectrum.
Expand Down Expand Up @@ -257,22 +66,22 @@ def binning(inp: pd.DataFrame, ignore_charges: bool, rescoring_path: Path) -> pd
return comb_ms


def aggregate_timstof(df_msms_scans: pd.DataFrame, temp_path: Path, chunk_size: int = 1000) -> pd.DataFrame:
def aggregate_timstof(raw_spectra: pd.DataFrame, temp_path: Path, chunk_size: int = 1000) -> pd.DataFrame:
"""
Combine spectra from the provided pd.DataFrame and perform binning on chunks.
This function splits the input pd.DataFrame into chunks, performs binning on each chunk of data,
merges the binning results with the original data, processes the combined spectra, and returns
the combined and processed spectra as a pd.DataFrame.
:param df_msms_scans: pd.DataFrame containing spectra information.
:param raw_spectra: pd.DataFrame containing spectra information.
:param temp_path: Path used for intermediate results during binning.
:param chunk_size: Size of chunks used for splitting the DataFrame.
:return: pd.DataFrame containing combined and processed spectra.
"""
chunk_list = []
# Split both DataFrames into chunks
chunks = [df_msms_scans[i : i + chunk_size] for i in range(0, len(df_msms_scans), chunk_size)]
chunks = [raw_spectra[i : i + chunk_size] for i in range(0, len(raw_spectra), chunk_size)]
for chunk in chunks:
bin_result_list = []
for _, line in chunk.iterrows():
Expand Down Expand Up @@ -301,22 +110,11 @@ def aggregate_timstof(df_msms_scans: pd.DataFrame, temp_path: Path, chunk_size:
return chunk_comb


def read_timstof(d_path, search_output_path):
def read_timstof(d_path, scan_to_precursor_map):
# preparation of filter
df_msms, df_precursors, df_pasef = _load_maxquant_txt(mq_search_path, "AspN_F1_E1_1_4468")
df_precursors.columns = ["Raw file", "SCAN_NUMBER", "PRECURSOR"]
df_precursor_map = (
df_precursors.query("SCAN_NUMBER in @df_msms.SCAN_NUMBER")
.set_index("SCAN_NUMBER")["PRECURSOR"]
.str.split(";")
.explode()
.astype("int")
)

df_pasef = df_pasef.query("PRECURSOR in @df_precursor_map").sort_values(["FRAME", "PRECURSOR"])

df_frame_group = (
df_pasef[["FRAME", "PRECURSOR"]]
scan_to_precursor_map[["FRAME", "PRECURSOR"]]
.groupby("FRAME", as_index=False)
.agg(
{
Expand Down Expand Up @@ -355,7 +153,7 @@ def read_timstof(d_path, search_output_path):

# aggregation
df_combined_grouped = (
df.merge(df_pasef)
df.merge(scan_to_precursor_map)
.query("SCANNUMBEGIN <= SCAN <= SCANNUMEND") # can probably be skipped
.groupby(["PRECURSOR", "FRAME"], as_index=False) # aggregate fragments per precursor in FRAME
.agg(
Expand All @@ -367,7 +165,7 @@ def read_timstof(d_path, search_output_path):
"INV_ION_MOBILITY": "first",
}
)
.merge(df_precursor_map.reset_index())
.merge(scan_to_precursor_map.reset_index())
.groupby("SCAN_NUMBER", as_index=False) # aggregate PRECURSORS for same SCAN_NUMBER
.agg(
median_CE=("COLLISIONENERGY", "median"),
Expand All @@ -389,16 +187,13 @@ def convert_d_hdf(
data.save_to_hdf(directory=str(output_path.parent), filename=str(output_path.name))


def read_and_aggregate_timstof(d_path: Path, search_output_path: Path, output_path: Path):
def read_and_aggregate_timstof(source: Path, scan_to_precursor_map: Path):
"""
Converts a .d folder to pkl.
Read raw spectra from timstof hdf spectra file and aggregate to MS2 spectra.
:param d_path: Path to the .d folder
:param search_output_path: Path to the output folder from the search engine
:param output_path: file path of the pkl file
:param source: Path to the hdf file
:param scan_to_precursor_map: Dataframe mapping scan numbers to precursors
"""
df_msms_scans = read_timstof(d_path, search_output_path) # ready
df_combined = aggregate_timstof(df_msms_scans, output_path)
# Write to pickle
file_name = df_combined["RAW_FILE"][0]
df_combined.to_pickle(output_path / f"{file_name}.pkl")
raw_spectra = read_timstof(source, scan_to_precursor_map) # ready
df_combined = aggregate_timstof(raw_spectra)
return df_combined

0 comments on commit 4bd53b4

Please sign in to comment.