-
Notifications
You must be signed in to change notification settings - Fork 0
/
roc_failedfouriertransform.py
210 lines (172 loc) · 8.96 KB
/
roc_failedfouriertransform.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
import os
import numpy as np
import pickle
from sklearn.metrics import roc_curve, auc
from matplotlib import pyplot as plt
from config import *
from attacks import *
from tools import import_images, multiprocessed_workload, update_parameters
from detection_failedfouriertransform import DWT_LEVEL, similarity, extract_watermark, wpsnr
from embedment_failedfouriertransform import embed_watermark
def compute_ROC(scores, labels, alpha, show: bool = True):
# compute ROC
fpr, tpr, thr = roc_curve(np.asarray(labels), np.asarray(scores), drop_intermediate=False)
# compute AUC
roc_auc = auc(fpr, tpr)
if show is True:
plt.figure()
lw = 2
plt.plot(fpr, tpr, color='darkorange',
lw=lw, label='AUC = %0.2f' % roc_auc)
plt.plot([0, 1], [0, 1], color='navy', lw=lw, linestyle='--')
plt.xlim([-0.01, 1.0])
plt.ylim([0.0, 1.05])
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.title(f'Receiver Operating Characteristic (ROC) with alpha {str(alpha)}')
plt.legend(loc="lower right")
plt.show()
idx_tpr = np.where((fpr - TARGET_FPR) == min(i for i in (fpr - TARGET_FPR) if i > 0))
print('For a FPR approximately equals to %0.2f corresponds a TPR equal to %0.2f and a threshold equal to %0.4f with FPR equal to %0.2f' % (TARGET_FPR, tpr[idx_tpr[0][0]], thr[idx_tpr[0][0]], fpr[idx_tpr[0][0]]))
return thr[idx_tpr[0][0]], tpr[idx_tpr[0][0]], fpr[idx_tpr[0][0]] # return thr
def generate_watermark(size_h: int, size_v: int = 0, save: bool = False) -> np.ndarray:
"""Generates a random watermark of size (size_h, size_v)
Generates a random watermark of size (size_h, size_v) if size_v is specified,
otherwise generates a square watermark of size (size_h, size_h)
Args:
size_h (int): Horizontal size
size_v (int, optional): Vertical size. Defaults to size_h.
Returns:
np.ndarray: Random watermark of the desired size
"""
if size_v == 0:
size_v = size_h
# Generate a watermark
mark = np.random.uniform(0.0, 1.0, size_v * size_h)
mark = np.uint8(np.rint(mark))
if save is True:
np.save('mark.npy', mark)
return mark.reshape((size_v, size_h))
def compute_thr_multiple_images(images, original_watermark, attacks, alpha, level, subband, continue_training, show: bool = True):
scores = []
labels = []
n_images = len(images)
i = 0
m = 0
attack_idx = 0
model_name = '_'.join([str(alpha),str(level),'-'.join(subband)])
if continue_training:
if exists_model(model_name):
scores, labels, _, _, _ = read_model(model_name)
n_computations = n_images * RUNS_PER_IMAGE * N_FALSE_WATERMARKS_GENERATIONS
print('Total number of computations: %d' % n_computations)
# step by step for clarity
for original_img, watermarked_img, img_name in images:
for j in range(attack_idx, attack_idx+RUNS_PER_IMAGE):
attacked_img, attacks_list = do_attacks(watermarked_img, attacks[j])
while wpsnr(original_img, attacked_img) < 35:
# print("Retry", attacks_list, "was too powerful")
attacks[j] = get_random_attacks(randint(MIN_N_ATTACKS, MAX_N_ATTACKS))
attacked_img, attacks_list = do_attacks(watermarked_img, attacks[j])
# 3. Extract the watermark with your planned technique Wextracted
extracted_watermark = extract_watermark(original_img, attacked_img, alpha, level, subband)
# 4. Compute sim(Woriginal,Wextracted) and append it in the scores array and the value 1 in the labels array. These values will correspond to the true positive hypothesis.
# true positive population
scores.append(similarity(original_watermark, extracted_watermark))
labels.append(1)
# perform multiple comparisons with random watermarks to better train the classifier against false positives
# 5. Generate a random watermark Wrandom and compute sim(Wrandom,Wextracted) to append it in the scores array and the value 0 in the labels array. These values will correspond to the true negative hypothesis.
for k in range(0, N_FALSE_WATERMARKS_GENERATIONS):
print('{}/{} - Performed attack {}/{} on image {}/{} ({}) - false check {}/{} - attacks: {}'.format(m + 1, n_computations, j%RUNS_PER_IMAGE, RUNS_PER_IMAGE, i + 1, n_images, img_name, k + 1, N_FALSE_WATERMARKS_GENERATIONS, attacks_list))
# true negative population
fake_mark = generate_watermark(32)
scores.append(similarity(fake_mark, extracted_watermark))
labels.append(0)
m += 1
i += 1
attack_idx += RUNS_PER_IMAGE
# 6. with scores and labels, generate the ROC and choose the best threshold τ corresponding to a False Positive Rate FPR ∈ [0, 0.1].
return scores,labels,compute_ROC(scores, labels, alpha, show)
def save_model(scores: list, labels: list, threshold: float, tpr: float, fpr: float, alpha: int, level: int, subband: list) -> None:
"""Saves the model trained models/model_<alpha>_<level>_<subband>.txt
The scores and label are saved too in case we want to continue training
Args:
scores (list): Scores list
labels (list): Labels list
threshold (float): The threshold
tpr (float): The true positive rate
fpr (float): The false positive rate
alpha (float): The alpha used for embedding
level (int): The level used for embedding
subband (list): The subband(s) used for embedding
"""
directory = 'models/'
if not os.path.isdir(directory):
os.mkdir(directory)
params = '_'.join([str(alpha),str(level),'-'.join(subband)])
f = open(directory + 'model_' + params, 'wb')
pickle.dump((scores, labels, threshold, tpr, fpr), f, protocol=2)
f.close()
def read_model(name: str) -> None:
"""Loads a model from a file
Args:
name (str): Name of the model to be loaded
"""
f = open('models/model_' + name, 'rb')
values = list(pickle.load(f))
(scores, labels, threshold, tpr, fpr) = (values[0], values[1], values[2], values[3], values[4])
f.close()
return scores, labels, threshold, tpr, fpr
def exists_model(name: str) -> None:
"""Checks if a model exists
Args:
name (str): Name of the model to be checked
"""
return os.path.exists('models/model_' + name)
def create_model(params, order_of_execution):
from roc_failedfouriertransform import compute_thr_multiple_images
from embedment_failedfouriertransform import embed_watermark
watermarked_images = []
images, watermark, alpha, level, subband, attacks, show_threshold, continue_training = params
for original_img, img_name in images:
watermarked_img = embed_watermark(original_img, watermark, alpha, level, subband)
watermarked_images.append((original_img, watermarked_img, img_name))
scores, labels, (threshold, tpr, fpr) = compute_thr_multiple_images(watermarked_images, watermark, attacks, alpha, level, subband, continue_training, show_threshold)
save_model(scores, labels, threshold, tpr, fpr, alpha, level, subband)
return order_of_execution, threshold, tpr, fpr, alpha,level,subband
def print_models():
# Sometimes this crashes because it can not find the file. Don't know why
for alpha in range(18,25):
for level in [2]:
for subband in [["LL"]]:
alpha = str(int(alpha))
level = str(level)
subband = "-".join(subband)
(scores, labels, threshold, tpr, fpr) = read_model(alpha + '_' + level + '_' + subband)
tpr = round(tpr,2)
fpr = round(fpr,2)
threshold = round(threshold,2)
print((alpha + '_' + level + '_' + subband).ljust(10), tpr, fpr, threshold)
def threshold_computation(continue_training):
images = import_images('images/test/original/', N_IMAGES_LIMIT, True)
watermark = np.load("failedfouriertransform.npy").reshape((MARK_SIZE, MARK_SIZE))
attacks = []
# 2. In a loop, attack one by one these images (with random attacks or the strategy you prefer)
for _ in images:
for _ in range(0, RUNS_PER_IMAGE):
attacks.append(get_random_attacks(randint(MIN_N_ATTACKS, MAX_N_ATTACKS)))
work = []
show_threshold = True
for alpha in range(18, 25):
for level in [2]:
for subband in [["LL"]]:
work.append((images, watermark, alpha, level, subband, attacks, show_threshold, continue_training))
result = multiprocessed_workload(create_model, work)
print(result)
if __name__ == '__main__':
continue_training = input("Vuoi tu continuare il training di vecchi modelli [Y/n]?\n >")
if continue_training == "Y":
threshold_computation(True)
else:
threshold_computation(False)
print_models()