-
Notifications
You must be signed in to change notification settings - Fork 0
/
evaluate_tool.py
229 lines (192 loc) · 10.2 KB
/
evaluate_tool.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
from difflib import SequenceMatcher
import os
import csv
import re
import string
import pandas as pd
import glob
import json
from collections import defaultdict
from mylib import string_lib as str_lib
from tqdm import tqdm
class evaluate:
def __init__(self) -> None:
self.ALL_GENES_FILE = "data/manual_annotations/45_all_text.csv"
self.ALVA_BIOBERT_PATH = "data/alvaroalon2_biobert_genetic_ner_results/"
# self.GENE_NAMES_PUBTATOR = "data/gene_names_from_pubtator/"
self.GENE_NAMES_PUBTATOR = "../Fei/gene_names_from_pubtator/"
self.gene_results = [gene_file.split("_")[1].split(".")[0] for gene_file in os.listdir(self.ALVA_BIOBERT_PATH)]
def retrieve_pmcids_from_path_figs(self):
genes_per_pmcid = defaultdict(list)
with open(self.ALL_GENES_FILE, mode='r') as csv_file:
csv_reader = csv.DictReader(csv_file)
for row in csv_reader:
pmcid = row["fig_name"].split('_')[0]
if pmcid in self.gene_results:
genes_per_pmcid[pmcid].append(row["display_text"].strip().lower())
return genes_per_pmcid
# creating a dictionary to store PMCIDs as keys and the predicted genes from BIOBERT as keys
def retrieve_ner_results(self,path):
genes_per_pred = defaultdict(list)
for result_file in os.listdir(path):
pmcid = result_file.split("_")[1].split(".")[0]
with open(path+result_file, 'r', encoding="UTF8") as f:
pmcid_genes = list(set(f.read().lower().strip().split('\n')))
genes_per_pred[pmcid].extend(pmcid_genes)
return genes_per_pred
def retreive_human_gene_alias(self):
human_gene_dict = defaultdict(list)
with open("data/unite_gene_alias.csv", mode='r', encoding="UTF8", newline='') as csv_file:
csv_reader = csv.reader(csv_file)
for row in csv_reader:
lower_cased_row = [r.lower().strip() for r in row]
human_gene_dict[row[0]].extend(lower_cased_row)
return human_gene_dict
def human_gene_reference(self, gene_list):
human_gene_dict = self.retreive_human_gene_alias()
gene_ref = {}
for gene in gene_list:
for key, value in human_gene_dict.items():
record = []
record.append(key.lower())
record.extend(value)
if gene in record:
gene_ref[gene] = record[0]
if gene not in gene_ref.keys():
gene_ref[gene] = gene
return gene_ref
#Name resolution for gene name from predicted gene_outputs
def resolve_ref_file(self, gene_dict, file_name):
with open(file_name, newline='', mode='w',encoding="UTF8") as csv_file:
csv_columns = ['Gene', 'Reference']
csv_writer = csv.writer(csv_file, delimiter=',', quotechar='"', quoting=csv.QUOTE_MINIMAL)
csv_writer.writerow(csv_columns)
for key, value in gene_dict.items():
csv_writer.writerow([key,value])
def write_csv_to_pandas(self, directory):
csv_files = glob.glob(directory + "/*.csv")
pd_list = []
for filename in csv_files:
df = pd.read_csv(filename, index_col=None, header=0)
pd_list.append(df)
frame = pd.concat(pd_list, axis=0, ignore_index=True)
new_df = frame.drop_duplicates(
subset = ['Gene', 'Reference'],
keep = 'last').reset_index(drop = True)
new_df.to_csv('data/gene_reference_dict.csv', index=False)
def normalize_gene_names(self,gene_dict):
ref_dict = pd.read_csv('data/gene_reference_dict.csv')
formatted_dict = defaultdict(list)
for key in gene_dict.keys():
values = gene_dict[key]
values_arr = []
for value in values:
data = ref_dict[ref_dict["Gene"] == value]['Reference']
values_arr.append(data.to_string().split(' ')[1].strip())
formatted_dict[key].append(values_arr)
return formatted_dict
def convert_csv_files_to_list(sef,csv_file):
gene_list = []
with open(csv_file, mode='r', encoding="UTF8", newline='') as f:
csv_reader = csv.DictReader(f)
for row in csv_reader:
row_values = list(row.values())
gene_string = str_lib.str_list_ops.format_gene_array(''.join( str(e) for e in row_values))
gene_string = [gene.lower() for gene in gene_string]
gene_list.extend(gene_string)
return gene_list
def fuzzy_rule(self, name):
name = name.upper()
name = name.replace(' ', '') # remove space
name = name.replace('.', '') # remove dot *
name = re.sub(u"\\(.*?\\)", "", name) # remove brackets and its content
name = name.replace('(', "").replace(')', "") # remove single brackets
name = name.rstrip(string.digits) # *
if name.find("-") >= 0:
if not name[name.find("-") + 1:len(name)].isalpha() or len(name[name.find("-") + 1:len(name)]) < len(
name[0:name.find("-")]):
name = name[0:name.find("-")]
if name.find("-") < 2:
name = name[name.find("-") + 1:len(name)]
# name = name.replace("-", "")
return name
def string_similarity(self, a , b):
return SequenceMatcher(None, a, b).ratio()
def fuzzy_gene_match(self,gene_dict_list, tokens_from_text_list, similarity_threshold):
gene_list = []
for gene in tqdm(gene_dict_list):
gene = self.fuzzy_rule(gene)
for token in tokens_from_text_list:
if self.string_similarity(token, gene) >= similarity_threshold:
# if token == gene or token in gene or self.string_similarity(token, gene) >= similarity_threshold:
gene_list.append(token)
return gene_list
# writing out the matching genes with scores to csv
def write_output_to_csv(self,file, genes_per_pmcid, genes_per_pred):
with open(file, newline='', mode='w') as mf:
csv_writer = csv.writer(mf, delimiter=',', quotechar='"', quoting=csv.QUOTE_MINIMAL)
csv_writer.writerow(["S/N","PMCID", "GENES", "MATCH COUNT", "SCORE"])
count = 0
# genes_per_pred = self.normalize_gene_names(genes_per_pred)
for key, value in genes_per_pmcid.items():
count += 1
# matches = str_lib.str_list_ops.common_elements(genes_per_pred[key][0], value)
matches = list(set(self.fuzzy_gene_match(genes_per_pred[key], value, 0.1)))
matches = [match for match in matches if len(match.strip()) > 0]
total_matches = len(matches)
score = round(str_lib.str_list_ops.match_ratio(value, matches), 4)
matches = ",".join(matches)
csv_writer.writerow([count, key, matches, total_matches, score])
def calculate_match_score(self, genes_per_pred, output_file):
genes = []
for gene in list(genes_per_pred):
gene = [g.strip() for g in gene]
genes.extend(gene)
genes_dict = (self.human_gene_reference(genes))
self.resolve_ref_file(genes_dict, output_file)
def write_combined_predicted_outputs(self, file, genes_per_pmcid, biobert, neji, bern, hugo):
json_dict = defaultdict(list)
with open(file, newline='', mode='w') as mf:
csv_writer = csv.writer(mf, delimiter=',', quotechar='"', quoting=csv.QUOTE_MINIMAL)
csv_writer.writerow(["S/N","PMCID", "GENES", "MATCH COUNT", "SCORE"])
count = 0
# genes_per_pred = self.normalize_gene_names(genes_per_pred)
for key, value in genes_per_pmcid.items():
count += 1
# matches = str_lib.str_list_ops.common_elements(genes_per_pred[key][0], value)
match_bert = list(set(self.fuzzy_gene_match(biobert[key], value, 0.1)))
match_neji = list(set(self.fuzzy_gene_match(neji[key], value, 0.1)))
match_bern = list(set(self.fuzzy_gene_match(bern[key], value, 0.1)))
match_hugo = list(set(self.fuzzy_gene_match(hugo[key], value, 0.1)))
matches = list(set(match_bert + match_neji + match_bern + match_hugo))
matches = [match for match in matches if len(match.strip()) > 0]
total_matches = len(matches)
score = round(str_lib.str_list_ops.match_ratio(value, matches), 4)
matches = ",".join(matches)
json_dict[key].append(matches)
csv_writer.writerow([count, key, matches, total_matches, score])
# json_data = json.dumps(json_dict, indent=2)
with open("data/output.json", "w") as f:
json.dump(json_dict, f, indent=3)
def write_pred_results_to_json(self, gene_dict):
json_dict = defaultdict(list)
stopwords = str_lib.str_list_ops.return_stop_words()
for key, value in gene_dict.items():
formatted_values = [val for val in value if val not in stopwords]
formatted_values = [val for val in formatted_values if not val.isdigit() ]
formatted_values = [val for val in formatted_values if len(val)>2 ]
json_dict[key].extend(formatted_values)
with open("data/hugo_output.json", "w") as f:
json.dump(json_dict, f, indent=3)
# def compare_pred_with_pathway(self, pred_dict, pathway):
# match_dict = defaultdict(list)
# for key, value in pathway.items():
# matches = str_lib.str_list_ops.fuzzy_gene_match(pred_dict[key], value, 0.5)
# match_dict[key].append(matches)
# return match_dict
# with open('data/scores/match_hugo_ner.csv', newline='', mode='w',encoding="UTF8") as csv_file:
# csv_columns = (["S/N","PMCID", "GENES", "MATCH COUNT", "SCORE"])
# csv_writer = csv.writer(csv_file, delimiter=',', quotechar='"', quoting=csv.QUOTE_MINIMAL)
# csv_writer.writerow(csv_columns)
# for key, value in match_dict.items():
# csv_writer.writerow([key,value])