forked from codekansas/keras-language-modeling
-
Notifications
You must be signed in to change notification settings - Fork 0
/
answer_to_question.py
129 lines (104 loc) · 4.54 KB
/
answer_to_question.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
from __future__ import print_function
import numpy as np
import os
from keras.engine import Input
from keras.layers import LSTM, RepeatVector, TimeDistributed, Dense, Activation
from keras.models import Model
# can remove this depending on ide...
os.environ['INSURANCE_QA'] = '/media/moloch/HHD/MachineLearning/data/insuranceQA/pyenc'
import sys
try:
import cPickle as pickle
except:
import pickle
class InsuranceQA:
def __init__(self):
try:
data_path = os.environ['INSURANCE_QA']
except KeyError:
print("INSURANCE_QA is not set. Set it to your clone of https://github.com/codekansas/insurance_qa_python")
sys.exit(1)
self.path = data_path
self.vocab = self.load('vocabulary')
self.table = InsuranceQA.VocabularyTable(self.vocab.values())
def load(self, name):
return pickle.load(open(os.path.join(self.path, name), 'rb'))
class VocabularyTable:
''' Identical to CharacterTable from Keras example '''
def __init__(self, words):
self.words = sorted(set(words))
self.words_indices = dict((c, i) for i, c in enumerate(self.words))
self.indices_words = dict((i, c) for i, c in enumerate(self.words))
def encode(self, sentence, maxlen):
indices = np.zeros((maxlen, len(self.words)))
for i, w in enumerate(sentence):
if i == maxlen: break
indices[i, self.words_indices[w]] = 1
return indices
def decode(self, indices, calc_argmax=True, noise=0):
if calc_argmax:
indices = indices + np.random.rand(*indices.shape) * noise
indices = indices.argmax(axis=-1)
return ' '.join(self.indices_words[x] for x in indices)
def get_model(question_maxlen, answer_maxlen, vocab_len, n_hidden):
answer = Input(shape=(answer_maxlen, vocab_len))
# answer = Masking(mask_value=0.)(answer)
# encoder rnn
encode_rnn = LSTM(n_hidden, return_sequences=True)(answer)
encode_rnn = LSTM(n_hidden, return_sequences=True)(encode_rnn)
encode_rnn = LSTM(n_hidden, return_sequences=False)(encode_rnn)
# repeat it maxlen times
repeat_encoding = RepeatVector(question_maxlen)(encode_rnn)
# decoder rnn
decode_rnn = LSTM(n_hidden, return_sequences=True)(repeat_encoding)
decode_rnn = LSTM(n_hidden, return_sequences=True)(decode_rnn)
decode_rnn = LSTM(n_hidden, return_sequences=True)(decode_rnn)
# output
dense = TimeDistributed(Dense(vocab_len))(decode_rnn)
softmax = Activation('softmax')(dense)
# compile the model
model = Model([answer], [softmax])
model.compile(loss='categorical_crossentropy', optimizer='adam', metrics=['accuracy'])
return model
if __name__ == '__main__':
question_maxlen, answer_maxlen = 10, 40
qa = InsuranceQA()
batch_size = 50
n_test = 5
print('Generating data...')
answers = qa.load('answers')
questions = qa.load('train')
def gen_questions(batch_size):
while True:
i = 0
question_idx = np.zeros(shape=(batch_size, question_maxlen, len(qa.vocab)))
answer_idx = np.zeros(shape=(batch_size, answer_maxlen, len(qa.vocab)))
for s in questions:
a = s['answers'][0]
answer = qa.table.encode([qa.vocab[x] for x in answers[a]], answer_maxlen)
question = qa.table.encode([qa.vocab[x] for x in s['question']], question_maxlen)
answer_idx[i] = answer
question_idx[i] = question
i += 1
if i == batch_size:
yield ([answer_idx], [question_idx])
i = 0
gen = gen_questions(batch_size)
test_gen = gen_questions(n_test)
print('Generating model...')
model = get_model(question_maxlen=question_maxlen, answer_maxlen=answer_maxlen,
vocab_len=len(qa.vocab), n_hidden=128)
print('Training model...')
for iteration in range(1, 200):
print()
print('-' * 50)
print('Iteration', iteration)
model.fit_generator(gen, samples_per_epoch=100*batch_size, nb_epoch=10)
x, y = next(test_gen)
y = y[0]
pred = model.predict(x, verbose=0)
for noise in [0, 0.1, 0.2]: # not sure what noise values would be good
print(' Noise: {}'.format(noise))
for i in range(n_test):
print(' Expected: {}'.format(qa.table.decode(y[i])))
print(' Predicted: {}'.format(qa.table.decode(pred[i], noise=noise)))