-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathLAI_hmm_script.py
212 lines (178 loc) · 9.33 KB
/
LAI_hmm_script.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
import random as rn
import numpy as np
import pandas as pd
from tqdm import tqdm
import math
def HammingDist(str1,str2):
"""
This function computes the Hamming distance (edit distance) between 2 strings.
"""
dist = 0
for i in range(len(str1)):
if str1[i] != str2[i]:
dist += 1
return dist
def makeSNPseq(df,col1,col2):
"""
This function takes a dataframe containing a single individual's SNP genotypes
at various positions. It combines each row into one haplotype string (2nt) and
returns these as a list.
"""
snps = []
for i in tqdm(range(len(df)), desc="makeSNPseq"):
snps.append(df.iloc[i,col1]+df.iloc[i,col2])
return snps
def standardizeIndices(df1,df2,col_name):
"""
This function cuts down both df1 and df2 to only contain values in the col (col_name)
that exist in the intersection of both. In practice, this is used to make sure the SNP
POS match between an emission df and a genotype df for each creation of an HMM.
"""
df1_pos = list(df1.loc[:,col_name])
df2_pos = list(df2.loc[:,col_name])
intersect = set(df1_pos).intersection(set(df2_pos))
fin_df1 = df1[df1[col_name].isin(intersect)]
fin_df1.sort_values(col_name)
fin_df1.reset_index(inplace=True,drop=True)
fin_df2 = df2[df2[col_name].isin(intersect)]
fin_df2.sort_values(col_name)
fin_df2.reset_index(inplace=True,drop=True)
return fin_df1, fin_df2
class HMMOptimalPathLAI:
def __init__(self, PopA, PopB, emission_df, recombination_freq, SNPseq):
"""
- States = ["AA", "AB", "BA", "BB"] --> states = {0:"AA",1:"AB",2:"BA",3:"BB"}
- Emissions = we will calculate these as we go based on the emission_df (which contains
the prob of each nt for popA and popB)
- Alphabet = ["AA","AC","AG","AT","CA","CC","CG","CT","GA","GC","GG","GT","TA","TC","TG","TT"]
- Transitions = will build matrix using an input recombination frequency in separate function
"""
#read in states, make a pop code dictionary, and construct an indexing state_dict {index:state}
self.states = {0: "AA", 1: "AB", 2:"BA", 3:"BB"}
self.n_states = len(self.states.keys())
self.pop_codes = {'A':PopA, 'B':PopB}
#read in emissions_df and recombination frequency and initialize the transition matrix
self.emissions_df = emission_df
self.recomb_freq = recombination_freq
self.transitions = {0:[], 1:[], 2:[], 3:[]}
#read in the SNPseq as a list
self.sequence = SNPseq
self.alphabet = ["AA","AC","AG","AT","CA","CC","CG","CT","GA","GC","GG","GT","TA","TC","TG","TT"]
#pick a random starting state (0,1,2,3) and initialize the matrices for the Viterbi algorithm
self.current_state = rn.randint(0, self.n_states-1)
self.optimal_matrix = np.zeros((self.n_states, len(SNPseq)))
self.backtrack = np.zeros((self.n_states, len(SNPseq)))
self.final_path = []
def get_transition_matrix(self):
"""
This function creates the transition matrix based on the recombination rate given
during initialization. By calculating the Hamming Distance (edit distance) between
populations (AA,AB,BA,BB), we can see how many ancestries are changed in each
transition and the use this and the recombination rate to assign a transition rate.
"""
for i in range(self.n_states):
state1 = self.states[i]
for j in range(self.n_states):
state2 = self.states[j]
dist = HammingDist(state1,state2)
if dist == 0:
self.transitions[i].append((1-self.recomb_freq)**2)
elif dist == 1:
self.transitions[i].append(self.recomb_freq*(1-self.recomb_freq))
elif dist == 2:
self.transitions[i].append(self.recomb_freq**2)
def get_emissions_matrix(self,ind):
"""
This function takes a SNP position index (works bc we have matched indexing for genotype
and emissions), and uses the emission_df to output a 4x16 emission probability matrix
(rows = 4 ancestry pop combos, cols = 16 genotype combos). The genotype combos are ordered
lexicographically: ["AA","AC","AG","AT","CA","CC","CG","CT","GA","GC","GG","GT","TA","TC",
"TG","TT"].
"""
#go through the columns with pop_nt freqs and make the emissions matrix
emissions_mat = np.zeros((4,16),dtype=float)
for i in range(self.n_states):
pop1 = self.pop_codes[self.states[i][0]]
pop2 = self.pop_codes[self.states[i][1]]
for j in range(len(self.alphabet)):
nt1 = self.alphabet[j][0]
nt2 = self.alphabet[j][1]
col1 = list(self.emissions_df.columns).index(pop1+"_"+nt1)
col2 = list(self.emissions_df.columns).index(pop2+"_"+nt2)
prob1 = self.emissions_df.iloc[ind,col1]
prob2 = self.emissions_df.iloc[ind,col2]
emissions_mat[i,j] = prob1*prob2
return emissions_mat
def get_optimal_path(self):
"""
This function performs the Viterbi algorithm using the input sequence of SNPs.
For each SNP, a position (index) specific emissions matrix is calculated and then used
to calculate the highest possible probability of being in each state for each SNP.
A backtrack matrix is stored which will facilitate reconstruction of the most probable
path through states, and thus the haplotypes of a chromosome.
"""
#go through every SNP in the input sequence (i=index, c=SNP string)
for i, c in tqdm(enumerate(self.sequence), desc="get_optimal_path"):
#pull out the lexicographic indexing of the seen 2nt SNP (idx)
idx = self.alphabet.index(c)
#create the emissions matrix for this SNP with the enumerated index
snp_emissions = self.get_emissions_matrix(i)
#go through all states and calculate max probability based on emission and transition probs
for j in range(self.n_states):
#get the probability of emitting c (SNP with index idx) when in state j
emit_prob = snp_emissions[j][idx]
#if this is the first SNP, then just set all positions in the matrix to the emissions prob
#(this assumes equal prob of start in any state, which is fine here)
if i == 0:
self.optimal_matrix[j, i] = math.log10(emit_prob)
self.backtrack[j, i] = -1
#otherwise go through all possible previous states and keep track of transition
#and emission probabilities from these states to the current one
else:
probs = []
for previous in range(self.n_states):
transition_prob = self.transitions[previous][j]
probs.append(math.log10(emit_prob * transition_prob) + self.optimal_matrix[previous, i-1])
#select the max probability and record backtrack
max_prob = max(probs)
back = probs.index(max_prob)
self.optimal_matrix[j, i] = max_prob
self.backtrack[j, i] = back
def reconstruct_path(self):
"""
This function uses the backtrack list stored from the Viterbi algorithm to reconstruct
the optimal path of states through the input sequence (SNPs).
"""
#find the ending state (state with highest prob for final SNP) and add it to the path
end_values = [self.optimal_matrix[v, len(self.sequence) - 1] for v in range(self.n_states)]
max_prob = max(end_values)
end_state = end_values.index(max_prob)
path = [end_state]
previous_state = end_state
#now walk backwards through the backtrack list until reaching the start
for i in tqdm(range(len(self.sequence)-1, -1, -1), desc="reconstruct_path"):
#print(previous_state, i)
path.append(int(self.backtrack[previous_state, i]))
previous_state = int(self.backtrack[previous_state, i])
#reverse path and output with state names
path.reverse()
self.final_path = [self.states[p] for p in path if p != -1]
def output_path(self,chr_num):
"""
This function translates the final path to a dataframe of the desired output
format: cols = CHR, POS, POP1, POP2. It then returns this dataframe so it can
be viewed and saved externally.
"""
chrom = [chr_num] * len(self.final_path)
pos = list(self.emissions_df.loc[:,"POS"])
#reconstruct the two populations from the final path using the pop_codes dict
hap1 = []
hap2 = []
for gt in tqdm(self.final_path, desc="output_path"):
pop1 = gt[0]
pop2 = gt[1]
hap1.append(self.pop_codes[pop1])
hap2.append(self.pop_codes[pop2])
df_dict = {'CHR':chrom, 'POS':pos, 'POP1':hap1, 'POP2':hap2}
results_df = pd.DataFrame(df_dict)
return(results_df)