-
Notifications
You must be signed in to change notification settings - Fork 0
/
u_net.py
157 lines (123 loc) · 6.51 KB
/
u_net.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
# -*- coding: utf-8 -*-
"""
Created on Wed Jun 5 12:11:49 2019
@author: ma10s
"""
from keras import backend as K
from tensorflow import keras
import matplotlib.pyplot as plt
def dice_coef(y_true, y_pred, smooth=1):
y_true_f = K.flatten(y_true)
y_pred_f = K.flatten(y_pred)
intersection = K.sum(y_true_f * y_pred_f)
return (2. * intersection + smooth) / (K.sum(y_true_f) + K.sum(y_pred_f) + smooth)
def dice_coef_loss(y_true, y_pred):
return 1-dice_coef(y_true, y_pred)
# Class definition
class Unet_model():
"""
We use keras to define CNN and DNN layers to the model
"""
def __init__(self, input_shape=(420,580,1), output_shape=15, con_len=3, con_layers=[32,64,128,256],
last_pooling=keras.layers.AvgPool1D, dense_layers=[100, 100], dataname='noname'):
self.name = 'unet_con_len' + str(con_len) + '_con_layers' + str(con_layers) + '_dense_layers' + str(
dense_layers) + '_data' + dataname
self.model = construct_model(input_shape, output_shape, con_len=con_len, con_layers=con_layers,
last_pooling=last_pooling, dense_layers=dense_layers)
self.save_as = 'saved_models/' + self.name
# train the model given the data
def train(self, inputs, targets, validation_inputs=None, validation_targets=None, batch_size=32, epochs=20, learning_rate=0.001,
save_model=True, val_freq=1, early_stopping_patience=5, plot_training_progress=False, verbose=1):
es = keras.callbacks.EarlyStopping(monitor='val_mean_absolute_error', mode='min', verbose=verbose,
patience=early_stopping_patience)
if save_model:
mcp_save = keras.callbacks.ModelCheckpoint(self.save_as + '.hdf5',
save_best_only=True,
monitor='accuracy',
mode='min')
# Using Adam optimizer
# loss = 'binary_crossentropy'
self.model.compile(optimizer=keras.optimizers.Adam(learning_rate), loss=dice_coef_loss, metrics=['accuracy'])
if validation_inputs is not None:
history = self.model.fit(
inputs, targets, validation_data=(validation_inputs,
validation_targets), epochs=epochs, batch_size=batch_size, shuffle=True,
callbacks=[mcp_save, es], verbose=verbose)
else:
history = self.model.fit(
inputs, targets, epochs=epochs, batch_size=batch_size,
shuffle=True,
callbacks=[mcp_save, es], verbose=verbose)
# To avoid overfitting load the model with best validation results after
# the first training part.
if save_model:
self.model = keras.models.load_model(self.save_as + '.hdf5')
# TODO: concatenate history1 and history2 to plot all the training
# progress
if plot_training_progress:
plt.plot(history.history['mae'])
plt.plot(history.history['val_mae'])
return history
# Predict
def predict(self, xt):
# predict
return self.model.predict(xt)
def load_model(self, save_as=None):
if save_as is None:
save_as = self.save_as
print("model load: ", save_as)
print("self.name: ", self.name, ", self.save_as: ", self.save_as)
self.model = keras.models.load_model(save_as + '.hdf5')
def construct_model(input_shape, output_shape, con_len=3, con_layers=[25, 50, 100], last_pooling=keras.layers.AvgPool2D,
dense_layers=[100, 100]):
# TODO: add a **kwargs to specify the hyperparameters
activation = 'relu'
dense_activation = 'relu'
padding = 'same'
poolpadding = 'valid'
maxpool = con_len
levels = 3
batch_mom = 0.99
reg = None
# pool = keras.layers.AvgPool1D #
pool = keras.layers.MaxPooling2D
model = keras.Sequential()
depth = input_shape[0]
levels = len(con_layers) - 1
inputs = keras.Input(input_shape)
conv = []
pool = []
conv_ = inputs
for level in range(levels):
conv_ = keras.layers.Conv2D(con_layers[level], con_len, activation='relu', padding='same', kernel_initializer='he_normal')(conv_)
conv_ = keras.layers.Conv2D(con_layers[level], con_len, activation='relu', padding='same', kernel_initializer='he_normal')(conv_)
conv.append(conv_)
conv_ = keras.layers.MaxPooling2D(pool_size=(2, 2))(conv_)
pool.append(conv_)
conv_ = keras.layers.Conv2D(con_layers[levels], con_len, activation='relu', padding='same',
kernel_initializer='he_normal')(conv_)
conv_ = keras.layers.Conv2D(con_layers[levels], con_len, activation='relu', padding='same',
kernel_initializer='he_normal')(conv_)
for level in range(levels-1,-1,-1):
# conv_ = keras.layers.Conv2D(con_layers[level], 2, activation='relu', padding='same', kernel_initializer='he_normal')(
# keras.layers.UpSampling2D(size=(2, 2))(conv_))
conv_ = keras.layers.Conv2DTranspose(con_layers[level], 2, strides= [2,2], activation='relu', padding='same')(conv_)
print("conv_ shape: ", conv_.shape)
print("conv[level] shape: ", conv[level].shape)
output_padding = None
if conv_.shape[1:3] != conv[level].shape[1:3]:
x_padding = conv[level].shape[1] - conv_.shape[1]
y_padding = conv[level].shape[2] - conv_.shape[2]
print("paddings: ", x_padding, y_padding)
conv_ = keras.layers.ZeroPadding2D(((0,x_padding),(0,y_padding)))(conv_)
print("conv_ shape: ", conv_.shape)
print("conv[level] shape: ", conv[level].shape)
conv_ = keras.layers.concatenate([conv[level], conv_], axis=3)
conv_ = keras.layers.Conv2D(con_layers[level], con_len, activation='relu', padding='same', kernel_initializer='he_normal')(conv_)
conv_ = keras.layers.Conv2D(con_layers[level], con_len, activation='relu', padding='same', kernel_initializer='he_normal')(conv_)
conv_ = keras.layers.Conv2D(con_layers[0], 3, activation='relu', padding='same', kernel_initializer='he_normal')(conv_)
conv_ = keras.layers.Conv2D(2, 3, activation='relu', padding='same', kernel_initializer='he_normal')(conv_)
conv_ = keras.layers.Conv2D(1, 1, activation='sigmoid')(conv_)
model = keras.models.Model(inputs=inputs, outputs=conv_)
model.summary()
return model