-
Notifications
You must be signed in to change notification settings - Fork 10
/
kBestViterbi.py
197 lines (148 loc) · 5.98 KB
/
kBestViterbi.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
import numpy as np
import pandas as pd
import networkx as nx
import networkx_viterbi as nxv
import heapq
import itertools
#Exhaustive search for verification
def exhaustive(pi, A, O, observations):
M = len(observations)
S = pi.shape[0]
scores = []
# track the running best sequence and its score
best = (None, float('-inf'))
# loop over the cartesian product of |states|^M
for ss in itertools.product(range(S), repeat=M):
# score the state sequence
score = pi[ss[0]] * O[ss[0], observations[0]]
for i in range(1, M):
score *= A[ss[i - 1], ss[i]] * O[ss[i], observations[i]]
# update the running best
if score > best[1]:
best = (ss, score)
scores.append((score, ss))
return best, scores
#Classic Parallel LVA Decoder using heaps and rankings
def kViterbiParallel(pi, a, b, obs, topK):
if topK == 1:
return viterbi(pi, a, b, obs)
nStates = np.shape(b)[0]
T = np.shape(obs)[0]
assert (topK <= np.power(nStates, T)), "k < nStates ^ topK"
# delta --> highest probability of any path that reaches point i
delta = np.zeros((T, nStates, topK))
# phi --> argmax
phi = np.zeros((T, nStates, topK), int)
#The ranking of multiple paths through a state
rank = np.zeros((T, nStates, topK), int)
# for k in range(K):
for i in range(nStates):
delta[0, i, 0] = pi[i] * b[i, obs[0]]
phi[0, i, 0] = i
#Set the other options to 0 initially
for k in range(1, topK):
delta[0, i, k] = 0.0
phi[0, i, k] = i
#Go forward calculating top k scoring paths
# for each state s1 from previous state s2 at time step t
for t in range(1, T):
for s1 in range(nStates):
h = []
for s2 in range(nStates):
# y = np.sort(delta[t-1, s2, :] * a[s2, s1] * b[s1, obs[t]])
for k in range(topK):
prob = delta[t - 1, s2, k] * a[s2, s1] * b[s1, obs[t]]
# y_arg = phi[t-1, s2, k]
state = s2
# Push the probability and state that led to it
heapq.heappush(h, (prob, state))
#Get the sorted list
h_sorted = [heapq.heappop(h) for i in range(len(h))]
h_sorted.reverse()
#We need to keep a ranking if a path crosses a state more than once
rankDict = dict()
#Retain the top k scoring paths and their phi and rankings
for k in range(0, topK):
delta[t, s1, k] = h_sorted[k][0]
phi[t, s1, k] = h_sorted[k][1]
state = h_sorted[k][1]
if state in rankDict:
rankDict[state] = rankDict[state] + 1
else:
rankDict[state] = 0
rank[t, s1, k] = rankDict[state]
# Put all the last items on the stack
h = []
#Get all the topK from all the states
for s1 in range(nStates):
for k in range(topK):
prob = delta[T - 1, s1, k]
#Sort by the probability, but retain what state it came from and the k
heapq.heappush(h, (prob, s1, k))
#Then get sorted by the probability including its state and topK
h_sorted = [heapq.heappop(h) for i in range(len(h))]
h_sorted.reverse()
# init blank path
path = np.zeros((topK, T), int)
path_probs = np.zeros((topK, T), float)
#Now backtrace for k and each time step
for k in range(topK):
#The maximum probability and the state it came from
max_prob = h_sorted[k][0]
state = h_sorted[k][1]
rankK = h_sorted[k][2]
#Assign to output arrays
path_probs[k][-1] = max_prob
path[k][-1] = state
#Then from t down to 0 store the correct sequence for t+1
for t in range(T - 2, -1, -1):
#The next state and its rank
nextState = path[k][t+1]
#Get the new state
p = phi[t+1][nextState][rankK]
#Pop into output array
path[k][t] = p
#Get the correct ranking for the next phi
rankK = rank[t + 1][nextState][rankK]
return path, path_probs, delta, phi
# define Viterbi algorithm for shortest path
# code adapted from Stephen Marsland's, Machine Learning An Algorthmic Perspective, Vol. 2
# https://github.com/alexsosn/MarslandMLAlgo/blob/master/Ch16/HMM.py
def viterbi(pi, a, b, obs):
nStates = np.shape(b)[0]
T = np.shape(obs)[0]
# init blank path
path = np.zeros(T, int)
# delta --> highest probability of any path that reaches state i
delta = np.zeros((nStates, T), float)
# phi --> argmax by time step for each state
phi = np.zeros((nStates, T), int)
# init delta and phi
delta[:, 0] = pi * b[:, obs[0]]
phi[:, 0] = 0
print('\nStart Walk Forward\n')
# the forward algorithm extension
for t in range(1, T):
for s in range(nStates):
delta[s, t] = np.max(delta[:, t - 1] * a[:, s]) * b[s, obs[t]]
phi[s, t] = np.argmax(delta[:, t - 1] * a[:, s])
print('s={s} and t={t}: phi[{s}, {t}] = {phi}'.format(s=s, t=t, phi=phi[s, t]))
# find optimal path
print('-' * 50)
print('Start Backtrace\n')
path[T - 1] = np.argmax(delta[:, T - 1])
# p('init path\n t={} path[{}-1]={}\n'.format(T-1, T, path[T-1]))
for t in range(T - 2, -1, -1):
path[t] = phi[path[t + 1], [t + 1]]
# p(' '*4 + 't={t}, path[{t}+1]={path}, [{t}+1]={i}'.format(t=t, path=path[t+1], i=[t+1]))
print('path[{}] = {}'.format(t+1, path[t+1]))
print('path[{}] = {}'.format(t, path[t]))
max_prob = np.max(delta[:, T-1])
return path, delta, phi, max_prob
if __name__ == '__main__':
#some models to try out
from model_wiki import *
# from model_tcohn import *
path, delta, phi, max_prob = viterbi(pi, a, b, obs)
path, delta, phi, max_prob = kViterbiParallel(pi, a, b, obs, 8)
paths = nxv.kViterbiGraph(pi, a, b, obs, 8)