Skip to content

Commit

Permalink
refactor: major refactoring of transcript-sampler
Browse files Browse the repository at this point in the history
  • Loading branch information
balajtimate committed Oct 20, 2023
1 parent 123bd5e commit 7dd3860
Showing 1 changed file with 44 additions and 97 deletions.
141 changes: 44 additions & 97 deletions scRNAsim_toolz/transcript_sampler/find_reptrans.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
"""Find representative transcripts."""
import logging
from typing import Union
import pandas as pd # type: ignore

LOG = logging.getLogger(__name__)

Expand Down Expand Up @@ -95,125 +96,71 @@ def get_rep_trans(self, file_name: str) -> dict:
Raises:
ValueError: If an unexpected entry is encountered in the GTF file.
"""
# setting default variables
rep_transcripts: dict = {}
cur_g_id = ""
cur_t_id = ""
pot_best_trans: list = []
cur_best_trans: list = []
# [transcript_id, transcript_support_level, transcript_length]
cur_best_trans = ["", 100, 0]
result_list = []

with open(file_name, "r", encoding="utf-8") as file:
for line in file:
entry = line.split("\t")
entry = line.strip().split("\t")

# removes expected but unneeded entries
if len(entry) == 1 or entry[2] in [
"CDS", "stop_codon",
"five_prime_utr", "three_prime_utr",
"start_codon", "Selenocysteine"
]:
]:
continue

# this function turns the less organized part of the entry
# into a readable list
attributes = self.attributes_converter(entry[8])

# looking for and processing exons entries
if entry[2] == "exon":
if cur_g_id != attributes[1]:
LOG.error("Exon from an unexpected gene")
raise ValueError("Exon from an unexpected gene")
if (
self.find_in_attributes(
attributes, "transcript_id"
) != cur_t_id
):
LOG.error("Exon from an unexpected transcript")
raise ValueError("Exon from an unexpected transcript")

# adding the length of the exon to the appropriate list and
# checking for changes in best transcript
if pot_best_trans:
pot_best_trans[2] += int(entry[4]) - int(entry[3])
if pot_best_trans[2] > cur_best_trans[2]:
cur_best_trans = pot_best_trans
else:
cur_best_trans[2] += int(entry[4]) - int(entry[3])
# 1. GENE entries
if entry[2] == "gene":
cur_g_id = self.find_in_attributes(attributes, "gene_id")

# looking for and processing transcript entries
# 2. TRANSCRIPT entries
elif entry[2] == "transcript":
# verify that the gen is correct
if cur_g_id != attributes[1]:
LOG.error("Transcript from an unexpected gene")
raise ValueError("Transcript from an unexpected gene")

# finding the transcript id and the support level
cur_t_id = self.find_in_attributes(
attributes, "transcript_id"
)
t_supp_lvl: Union[int, str] = self.find_in_attributes(
tsl: Union[int, str] = self.find_in_attributes(
attributes, "transcript_support_level"
)

# If there is no transcript support level or the level is
# given as NA it is nomed as 100. else the transcript
# support level is turned into int
if t_supp_lvl == "NA":
t_supp_lvl = 100
else:
if isinstance(
t_supp_lvl, str
) and t_supp_lvl.isdigit():
t_supp_lvl = int(t_supp_lvl)
else:
t_supp_lvl = 100

# decides if the transcript has potential to become the
# representative transcript
if (
t_supp_lvl < cur_best_trans[1] or
cur_best_trans[0] == ""
):
cur_best_trans = [cur_t_id, t_supp_lvl, 0]
elif t_supp_lvl == cur_best_trans[1]:
pot_best_trans = [cur_t_id, t_supp_lvl, 0]

# looking for and processing gene entries
elif entry[2] == "gene":
# updating rep_transcripts dict
if cur_g_id in rep_transcripts:
if (rep_transcripts[cur_g_id][1] > cur_best_trans[1]
or (rep_transcripts[cur_g_id][1] ==
cur_best_trans[1]
and rep_transcripts[cur_g_id][2] <
cur_best_trans[2])):
rep_transcripts[cur_g_id] = cur_best_trans
if tsl == "NA":
tsl = 100
else:
rep_transcripts[cur_g_id] = cur_best_trans
tsl = int(tsl)

# updating cur_g_id and resetting cur_best_trans
cur_g_id = attributes[1]
cur_best_trans = ["", 100, 0]
result_list.append(
{"gene_id": cur_g_id, "transcript_id": cur_t_id,
"tsl": tsl, "exon_length": 0}
)

# raises an error for unidentifiable entries
else:
LOG.error("This entry could not be identified")
raise ValueError("This entry could not be identified")

# adding the final gene to the dictionary
if cur_g_id in rep_transcripts:
if (rep_transcripts[cur_g_id][1] > cur_best_trans[1] or
(rep_transcripts[cur_g_id][1] == cur_best_trans[1] and
rep_transcripts[cur_g_id][2] < cur_best_trans[2])):
rep_transcripts[cur_g_id] = cur_best_trans
else:
rep_transcripts[cur_g_id] = cur_best_trans

del rep_transcripts[""]
rep_transcripts = self.reformat_reptrans(rep_transcripts)
return rep_transcripts
# EXON entries
elif entry[2] == "exon":
exon_length = int(entry[4]) - int(entry[3])

for item in result_list:
if (
item["gene_id"] == cur_g_id
and item["transcript_id"] == cur_t_id
):
item["exon_length"] += exon_length # type: ignore

# Convert the list of dictionaries into a DataFrame
df = pd.DataFrame(result_list)

# Create dictionary of representative transcripts
sorted_df = df.sort_values(
by=['gene_id', 'tsl', 'exon_length'], ascending=[True, True, False]
)

# Keep the first row for each gene_id (best representative transcript)
best_rep_df = sorted_df.drop_duplicates(subset='gene_id', keep='first')

# Create a dictionary with gene_id as keys and transcript_id as values
best_rep_transcripts = best_rep_df.set_index(
'gene_id'
)['transcript_id'].to_dict()

return best_rep_transcripts

def gtf_file_writer(self, original_file: str,
rep_transcript_dict: dict, output_file: str):
Expand Down

0 comments on commit 7dd3860

Please sign in to comment.