-
Notifications
You must be signed in to change notification settings - Fork 0
/
Network.py
241 lines (192 loc) · 7.99 KB
/
Network.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
#!/usr/bin/python
from __future__ import division
import numpy as np
import random as rand
import math as m
class Network(object):
class QuadraticCost(object):
@staticmethod
def delta(z, a, y):
return sigmoid_prime(z) * (a - y)
@staticmethod
def fn(a, y):
return 0.5 * m.pow(y - a, 2)
class CrossEntropy(object):
@staticmethod
def delta(z, a, y):
return (a - y)
@staticmethod
def fn(a, y):
return np.sum(
np.nan_to_num(-y * np.log(a) - (1 - y) * np.log(1 - a)))
def __init__(self, sizes, cost=CrossEntropy, debug=False):
self.num_layers = len(sizes)
self.biases = [np.random.randn(y, 1) for y in sizes[1:]]
self.weights = [(1 / m.sqrt(x)) * np.random.randn(y, x)
for x, y in zip(sizes[:-1], sizes[1:])]
self.cost = cost
self.epoch_test_accuracy = []
self.epoch_train_accuracy = []
self.epoch_train_cost = []
self.epoch_test_cost = []
self.debug = debug
def feedforward(self, a):
for b, w in zip(self.biases, self.weights):
a = sigmoid(np.dot(w, a) + b)
return a
def weight_bias_initializer(self):
return ""
def SGD(self,
training_data,
epochs,
mini_batch_size,
eta,
decay=0.0,
test_data=None,
early_stop = -1):
"""
:param self:
:param training_data: (X,Y) input.
:param epochs:
:param mini_batch_size: Subset size of training data
:param eta: Number of iterations
:param test_data:
"""
orig_eta = eta
if early_stop < 0:
early_stop = epochs
epoch_train_accuracy = []
epoch_test_accuracy = []
epoch_train_cost = []
epoch_test_cost = []
if test_data:
n_test = len(test_data)
n = len(training_data)
max = -1
duration_not_max = 0
for j in xrange(epochs):
rand.shuffle(training_data)
mini_batches = [
training_data[k: k + mini_batch_size]
for k in range(0, n, mini_batch_size)]
for mini_batch in mini_batches:
self.update_mini_batch(mini_batch, eta ,decay , n)
if test_data:
score = self.evaluate(test_data)
if score > max:
max = score
duration_not_max = 0
else:
duration_not_max += 1
if self.debug:
print "Epoch {0}: {1} / {2}".format(
j, score, n_test)
else:
if self.debug:
print "Epoch {0} complete".format(j)
epoch_train_accuracy.append(self.calc_accuracy(training_data, convert=True))
epoch_train_cost.append(self.calc_cost(training_data))
epoch_test_accuracy.append(self.calc_accuracy(test_data))
epoch_test_cost.append(self.calc_cost(test_data, convert=True))
if duration_not_max >= early_stop:
if eta * 128 < orig_eta:
print "Score didn't rise for {0} epochs and learning rate is 1/128 of the original, exiting early".format(early_stop)
break
else:
eta *= 0.1
self.epoch_train_accuracy = epoch_train_accuracy
self.epoch_train_cost = epoch_train_cost
self.epoch_test_accuracy = epoch_test_accuracy
self.epoch_test_cost = epoch_test_cost
def calc_accuracy(self, data, convert=False):
if convert:
test_results = [(np.argmax(self.feedforward(x)), np.argmax(y))
for (x, y) in data]
else:
test_results = [(np.argmax(self.feedforward(x)), y)
for (x, y) in data]
return sum(int(x == y) for (x, y) in test_results) / len(data)
def calc_cost(self, data, convert=False):
if convert:
cost_total = [
(self.cost).fn(
self.feedforward(x),
vectorized_result(y)) for (
x,
y) in data]
else:
cost_total = [(self.cost).fn(self.feedforward(x), y)
for (x, y) in data]
return np.sum(cost_total) / len(data)
def update_mini_batch(self, mini_batch, eta , decay ,n):
x = np.column_stack((mini_batch[i][0]
for i in xrange(len(mini_batch))))
y = np.column_stack((mini_batch[i][1]
for i in xrange(len(mini_batch))))
nabla_b, nabla_w = self.backprop(x, y)
for i in xrange(self.num_layers - 1):
nabla_b[i] = np.sum(nabla_b[i], axis=1).reshape(len(nabla_b[i]), 1)
self.weights = [(1 - (decay * eta / n)) * w - (eta / len(mini_batch)) * nw
for w, nw in zip(self.weights, nabla_w)]
self.biases = [b - (eta / len(mini_batch)) * nb
for b, nb in zip(self.biases, nabla_b)]
def backprop(self, x, y):
"""Return a tuple ``(nabla_b, nabla_w)`` representing the
gradient for the cost function C_x. ``nabla_b`` and
``nabla_w`` are layer-by-layer lists of numpy arrays, similar
to ``self.biases`` and ``self.weights``."""
nabla_b = [np.zeros(b.shape) for b in self.biases]
nabla_w = [np.zeros(w.shape) for w in self.weights]
# feedforward
activation = x
activations = [x] # list to store all the activations, layer by layer
zs = [] # list to store all the z vectors, layer by layer
for b, w in zip(self.biases, self.weights):
z = np.dot(w, activation) + \
np.column_stack((b for _ in xrange(x.shape[1])))
zs.append(z)
activation = sigmoid(z)
activations.append(activation)
# backward pass
delta = (self.cost).delta(z, activations[-1], y)
nabla_b[-1] = delta
nabla_w[-1] = np.dot(delta, activations[-2].transpose())
# Note that the variable l in the loop below is used a little
# differently to the notation in Chapter 2 of the book. Here,
# l = 1 means the last layer of neurons, l = 2 is the
# second-last layer, and so on. It's a renumbering of the
# scheme in the book, used here to take advantage of the fact
# that Python can use negative indices in lists.
for l in xrange(2, self.num_layers):
z = zs[-l]
sp = sigmoid_prime(z)
delta = np.dot(self.weights[-l + 1].transpose(), delta) * sp
nabla_b[-l] = delta
nabla_w[-l] = np.dot(delta, activations[-l - 1].transpose())
return (nabla_b, nabla_w)
def evaluate(self, test_data):
"""Return the number of test inputs for which the neural
network outputs the correct result. Note that the neural
network's output is assumed to be the index of whichever
neuron in the final layer has the highest activation."""
test_results = [(np.argmax(self.feedforward(x)), y)
for (x, y) in test_data]
return sum(int(x == y) for (x, y) in test_results)
def cost_derivative(self, output_activations, y):
"""Return the vector of partial derivatives \partial C_x /
\partial a for the output activations."""
return (output_activations - y)
def vectorized_result(j):
"""Return a 10-dimensional unit vectosizesr with a 1.0 in the j'th position
and zeroes elsewhere. This is used to convert a digit (0...9)
into a corresponding desired output from the neural network.
"""
e = np.zeros((10, 1))
e[j] = 1.0
return e
def sigmoid(z):
"""The sigmoid function."""
return 1.0 / (1.0 + np.exp(-z))
def sigmoid_prime(z):
"""Derivative of the sigmoid function."""
return sigmoid(z) * (1 - sigmoid(z))