-
Notifications
You must be signed in to change notification settings - Fork 49
/
k_nearest_neighbor.py
149 lines (135 loc) · 4.95 KB
/
k_nearest_neighbor.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
# coding=utf-8
import numpy as np
import util_kd_tree as kdtree
class KNNModel_Naive(object):
def __init__(self, k, X_train, Y_train):
"""
Train model implicitly. No explicit training process.
:param:
:param X_train:
:param Y_train:
"""
self.k = k
self.X_train = X_train
self.Y_train = Y_train
def validate(self, X_val, Y_val):
"""
Validate the trained model.
:param X_val:
:param Y_val:
:return:
"""
label_list = []
row, col = X_val.shape
for i in range(row):
dist = np.linalg.norm(X_val[i, :] - self.X_train, axis=1)
res_idx = np.argsort(dist)[:self.k]
res = [self.Y_train[i] for i in res_idx]
label = np.argmax(np.bincount(res))
label_list.append(label)
label_array = np.array(label_list)
accuracy = len(np.where(label_array == Y_val)) / row
return accuracy, label_array
class KNNModel_Heap(KNNModel_Naive):
def validate(self, X_val, Y_val):
"""
Validate the trained model.
:param X_val:
:param Y_val:
:return:
"""
import heapq
label_list = []
row, col = X_val.shape
for i in range(row):
heap = []
dist = np.linalg.norm(X_val[i, :] - self.X_train, axis=1)
for idx, d in enumerate(dist):
if len(heap) < self.k:
heapq.heappush(heap, (-d, idx))
elif d < -heap[0][0]: # -heap[0][0] is the maximum distance in heap.
heapq.heappushpop(heap, (-d, idx))
res = [self.Y_train[r[1]] for r in heap]
label = np.argmax(np.bincount(res))
label_list.append(label)
label_array = np.array(label_list)
accuracy = len(np.where(label_array == Y_val)) / row
return accuracy, label_array
class KNNModel_KDTree(KNNModel_Naive):
def __init__(self, k, X_train, Y_train):
"""
Train model.
:param X_train:
:param Y_train:
"""
super().__init__(k, X_train, Y_train)
self.kd_node = kdtree.KDTree(X_train, range(X_train.shape[0]))
def validate(self, X_val, Y_val):
"""
Validate the trained model.
:param X_val:
:param Y_val:
:return:
"""
label_list = []
row, col = X_val.shape
for i in range(row):
nn_nodes = self.kd_node.search(X_val[i], self.k)
res = [self.Y_train[n.data] for n in nn_nodes]
label = np.argmax(np.bincount(res))
label_list.append(label)
label_array = np.array(label_list)
accuracy = len(np.where(label_array == Y_val)) / row
return accuracy, label_array
def _TestEfficiency():
import random
X_train, Y_train = [], []
# k=5, n=100000, m=200: 2.4(Naive), 6.0(Heap), 0.05(KDTree)
k_, n_, t_num, d = 5, 1000, 200, 500
# Train
for _ in range(n_):
X_train.append([random.uniform(0, 50) for _ in range(d)])
Y_train.append(random.randint(0, 1))
X_train, Y_train = np.array(X_train), np.array(Y_train)
# Validate
X_val, Y_val = [], []
for _ in range(t_num):
X_val.append([random.uniform(0, 50) for _ in range(d)])
Y_val.append(random.randint(0, 1))
X_val, Y_val = np.array(X_val), np.array(Y_val)
for model_class in (KNNModel_Naive, KNNModel_Heap, KNNModel_KDTree):
model = model_class(k_, X_train, Y_train)
print('=' * 5 + 'Model type %r' % (model,) + '=' * 5)
t1 = time.clock()
print('Accuracy is %r' % (model.validate(X_val, Y_val),))
print('Total used time is %r' % (time.clock() - t1,))
def _TestVisualization():
import matplotlib.pyplot as plt
k, m, n_train, n_val = 5, 4, 5, 2
X_train, X_val, Y_train, Y_val = [], [], [], []
color = ['c', 'g', 'b', 'r']
for i in range(m):
for _ in range(n_train):
x, y, l = random.uniform(int(i/2)+0.1, int(i/2)+0.9), random.uniform(i%2+0.1, i%2+0.9), i
X_train.append((x, y))
Y_train.append(i)
plt.scatter(x, y, s=100, c=color[i])
for _ in range(n_val):
x, y, l = random.uniform(int(i/2)+0.1, int(i/2)+0.9), random.uniform(i%2+0.1, i%2+0.9), i
X_val.append((x, y))
Y_val.append(i)
X_train, X_val, Y_train, Y_val = np.array(X_train), np.array(X_val), np.array(Y_train), np.array(Y_val)
for model_class in (KNNModel_KDTree,):
model = model_class(k, X_train, Y_train)
accuracy, label_val = model.validate(X_val, Y_val)
for i in range(len(label_val)):
plt.scatter(X_val[i, 0], X_val[i, 1], alpha=0.3, s=100, c=color[Y_val[i]], linewidths=2, edgecolors=color[label_val[i]])
plt.grid()
plt.xlim(0, 2)
plt.ylim(0, 2)
plt.show()
if __name__ == '__main__':
import random, time
_TestEfficiency()
# Visualization
# _TestVisualization()