-
Notifications
You must be signed in to change notification settings - Fork 0
/
prototype.py
324 lines (260 loc) · 12.6 KB
/
prototype.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
########################################################################################
#
# Implementation of a prototype model for speaker recognition.
#
# It consists of a single CNN layer which convolutes the input spectrogram,
# a pooling layer which reduces the CNN output to a fixed-size speaker embedding,
# and a single FC classification layer.
#
# Author(s): Nik Vaessen
########################################################################################
from typing import Optional, Tuple, List
import torch as t
import torch.nn as nn
import torch.nn.functional as F
import torch.optim
from pytorch_lightning import LightningModule
from torchmetrics import Accuracy
from skeleton.evaluation.evaluation import (
EmbeddingSample,
EvaluationPair,
evaluate_speaker_trials,
)
from skeleton.layers.resnext import ResNext
from skeleton.layers.resnet import ResNet
from skeleton.layers.statistical_pooling import MeanStatPool1D
########################################################################################
# Implement the lightning module for training a prototype model
# for speaker recognition on tiny-voxceleb
class PrototypeSpeakerRecognitionModule(LightningModule):
def __init__(
self,
num_inp_features: int,
num_embedding: int,
num_speakers: int,
learning_rate: float,
val_trials: Optional[List[EvaluationPair]] = None,
test_trials: Optional[List[EvaluationPair]] = None,
):
super().__init__()
# hyperparameters
self.num_inp_features = num_inp_features
self.num_embedding = num_embedding
self.num_speakers = num_speakers
self.learning_rate = learning_rate
self.original_lr = learning_rate
# evaluation data
self.val_trials = val_trials
self.test_trials = test_trials
# 1-dimensional convolution layer which transforms the spectrogram
# of shape [BATCH_SIZE, NUM_MEL, NUM_FRAMES]
# into embedding of shape [BATCH_SIZE, NUM_EMBEDDING, REDUCED_NUM_FRAMES]
self.embedding_layer = nn.Sequential(
nn.Conv1d(
in_channels=num_inp_features,
out_channels=128,
kernel_size=3,
stride=1,
),
nn.ReLU(),
)
# Change prediction_layer in_featuers to the output size of the resnet model used.
# Change the resnet called in the compute_embedding to the resnet model used.
# Change the compute_embedding function entirely if ResNet is used instead of ResNeXt.
self.resnet10 = ResNet(((32,2,64),(64,2,128)))
self.resnet18 = ResNet(((32,2,64),(64,2,128),(128,2,256),(256,2,512)))
self.resnet34 = ResNet(((32,3,64),(64,4,128),(128,6,256),(256,3,512)))
self.resnet = ResNext(((num_embedding, 2, num_embedding*2),(num_embedding*2, 2, num_embedding*4), (num_embedding*4, 2, num_embedding*8), (num_embedding*8, 2, num_embedding*16)))
# Pooling layer
# assuming input of shape [BATCH_SIZE, NUM_EMBEDDING, REDUCED_NUM_FRAMES]
# reduced to shape [BATCH_SIZE, NUM_EMBEDDING]
self.pooling_layer = MeanStatPool1D(dim_to_reduce=2)
# Fully-connected layer which is responsible for transforming the
# speaker embedding of shape [BATCH_SIZE, NUM_EMBEDDING] into a
# speaker prediction of shape [BATCH_SIZE, NUM_SPEAKERS]
self.prediction_layer = nn.Sequential(
nn.Linear(in_features=num_embedding, out_features=num_speakers),
nn.LogSoftmax(dim=1)
)
# The loss function. Be careful - some loss functions apply the (log)softmax
# layer internally (e.g F.cross_entropy) while others do not
# (e.g F.nll_loss)
self.loss_fn = F.nll_loss
# used to keep track of training/validation accuracy
self.train_acc = Accuracy(task="multiclass", num_classes=self.num_speakers)
self.val_acc = Accuracy(task="multiclass", num_classes=self.num_speakers)
# save hyperparameters for easy reloading of model
self.save_hyperparameters()
def forward(self, spectrogram: t.Tensor) -> Tuple[t.Tensor, t.Tensor]:
# we split the forward pass into 2 phases:
# first compute the speaker embeddings based on the spectrogram:
speaker_embedding = self.compute_embedding(spectrogram)
# then compute the speaker prediction probabilities based on the
# embedding
speaker_prediction = self.compute_prediction(speaker_embedding)
return speaker_embedding, speaker_prediction
def compute_embedding(self, spectrogram: t.Tensor) -> t.Tensor:
# modify to your liking!
feature_representation = self.embedding_layer(spectrogram) # -> [128,128,239]
output = self.resnext(feature_representation)
output = output[:, :, None] # -> ([128, 128, 1])
embedding = self.pooling_layer(output) # -> [128, 128]
return embedding
# Use this function if ResNet is used instead of ResNeXt.
# def compute_embedding(self, spectrogram: t.Tensor) -> t.Tensor:
# resnet_output = self.resnet10(spectrogram)
# embedding = self.pooling_layer(resnet_output)
# return embedding
def compute_prediction(self, embedding: t.Tensor) -> t.Tensor:
# modify to your liking!
# embedding = embedding[None, :, :]
prediction = self.prediction_layer(embedding)
# print(prediction.shape)
return prediction
# @property
# def automatic_optimization(self) -> bool:
# return False
def training_step(
self, batch: Tuple[List[str], t.Tensor, t.Tensor], *args, **kwargs
) -> t.Tensor:
# first unwrap the batch into the input tensor and ground truth labels
# opt = self.optimizers()
sample_id, network_input, speaker_labels = batch
# opt = self.optimizers()
# opt.zero_grad()
assert network_input.shape[0] == speaker_labels.shape[0]
assert network_input.shape[1] == self.num_inp_features
assert len(network_input.shape) == 3
# then compute the forward pass
embedding, prediction = self.forward(network_input)
# based on the output of the forward pass we compute the loss
loss = self.loss_fn(prediction, speaker_labels)
# self.manual_backward(loss)
# opt.step()
# based on the output of the forward pass we compute some metrics
self.train_acc(prediction, speaker_labels)
# log training loss
self.log("loss", loss, prog_bar=False)
# The value we return will be minimized
return loss
def training_epoch_end(self, outputs: List[t.Tensor]) -> None:
# at the end of a training epoch we log our metrics
self.log("train_acc", self.train_acc, prog_bar=True)
def validation_step(
self, batch: Tuple[List[str], t.Tensor, t.Tensor], *args, **kwargs
) -> Tuple[t.Tensor, t.Tensor, List[str]]:
# first unwrap the batch into the input tensor and ground truth labels
sample_id, network_input, speaker_labels = batch
assert network_input.shape[0] == speaker_labels.shape[0]
assert network_input.shape[1] == self.num_inp_features
assert len(network_input.shape) == 3
# then compute the forward pass
embedding, prediction = self.forward(network_input)
# based on the output of the forward pass we compute the loss
loss = self.loss_fn(prediction, speaker_labels)
# based on the output of the forward pass we compute some metrics
self.val_acc(prediction, speaker_labels)
# The value(s) we return will be saved until the end op the epoch
# and passed to `validation_epoch_end`,
# We move the embeddings to CPU to prevent taking up space on the GPU
# for next batch(es)
return embedding.to("cpu"), loss, sample_id
def validation_epoch_end(
self, outputs: List[Tuple[t.Tensor, t.Tensor, List[str]]]
) -> None:
# at the end of a validation epoch we compute the validation EER
# based on the embeddings and log all metrics
# unwrap outputs
embeddings = [embedding for embedding, _, _ in outputs]
losses = [loss for _, loss, _ in outputs]
sample_keys = [key for _, _, key in outputs]
# log metrics
self.log("val_acc", self.val_acc, prog_bar=True)
self.log("val_loss", t.mean(t.stack(losses)), prog_bar=True)
# compute an`d` log val EER
if self.val_trials is not None:
val_eer = self._evaluate(embeddings, sample_keys, self.val_trials)
val_eer = t.tensor(val_eer, dtype=t.float32)
self.log("val_eer", val_eer, prog_bar=True)
def test_step(
self, batch: Tuple[List[str], t.Tensor, t.Tensor], *args, **kwargs
) -> Tuple[t.Tensor, List[str]]:
# first unwrap the batch into the input tensor and ground truth labels
sample_id, network_input, speaker_labels = batch
assert network_input.shape[0] == 1
assert network_input.shape[1] == self.num_inp_features
assert len(network_input.shape) == 3
# then compute the speaker embedding
embedding = self.compute_embedding(network_input)
# The value(s) we return will be saved until the end op the epoch
# and passed to `test_epoch_end`.
# We move the embeddings to CPU to prevent taking up space on the GPU
# for next batch(es)
return embedding.to("cpu"), sample_id
def test_epoch_end(self, outputs: List[t.Tensor]) -> None:
# at the end of the test epoch we compute the test EER
if self.test_trials is None:
return
# unwrap outputs
embeddings = [embedding for embedding, _ in outputs]
sample_keys = [key for _, key in outputs]
# compute test EER
test_eer = self._evaluate(embeddings, sample_keys, self.test_trials)
# log EER
self.log("test_eer", test_eer)
def configure_optimizers(self):
# setup the optimization algorithm
optimizer = t.optim.Adam(self.parameters(), self.learning_rate)
# setup the learning rate schedule.
# Here StepLR acts as a constant lr.
# Adapt schedule to your liking :).
schedule = {
# Required: the scheduler instance.
"scheduler": t.optim.lr_scheduler.StepLR(optimizer, step_size=10, gamma=0.8),
# The unit of the scheduler's step size, could also be 'step'.
# 'epoch' updates the scheduler on epoch end whereas 'step'
# updates it after an optimizer update.
"interval": "epoch",
# How many epochs/steps should pass between calls to
# `scheduler.step()`. 1 corresponds to updating the learning
# rate after every epoch/step.
"frequency": 1,
# Metric to monitor for schedulers like `ReduceLROnPlateau`
"monitor": "val_loss",
# If set to `True`, will enforce that the value specified 'monitor'
# is available when the scheduler is updated, thus stopping
# training if not found. If set to `False`, it will only produce a warning
"strict": True,
# If using the `LearningRateMonitor` callback to monitor the
# learning rate progress, this keyword can be used to specify
# a custom logged name
"name": None,
}
return [optimizer], [schedule]
def _evaluate(
self,
embeddings: List[t.Tensor],
sample_keys: List[List[str]],
pairs: List[EvaluationPair],
):
# construct a list of embedding samples
assert len(embeddings) == len(sample_keys)
embedding_list: List[EmbeddingSample] = []
for embedding_tensor, key_list in zip(embeddings, sample_keys):
if len(key_list) != embedding_tensor.shape[0]:
raise ValueError("batch dimension is missing or incorrect")
assert len(embedding_tensor.shape) == 2
assert embedding_tensor.shape[0] == len(key_list)
# we have to loop over the batch dimension and access each embedding
for idx, sample_id in enumerate(key_list):
embedding_list.append(
# make sure embedding is 1-dimensional, and move to CPU to avoid OOM
EmbeddingSample(
sample_id, embedding_tensor[idx, :].squeeze().to("cpu")
)
)
# evaluate the embeddings based on the trial list of pairs
result = evaluate_speaker_trials(
trials=pairs, embeddings=embedding_list, skip_eer=False
)
return result["eer"]