-
Notifications
You must be signed in to change notification settings - Fork 1
/
affinity_propagation.py
136 lines (120 loc) · 4.49 KB
/
affinity_propagation.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Created on Fri Jan 13 17:34:37 2023
@author: wousch
"""
import numpy as np
import copy
def aff_prop_a(A,R,lambda_damp):
# 'Affinitity Propagation clustering'
# 'Makes availability matrix'
# 'Supply availability (A) and responsibility (R) matrix'
# 'Keyword: lambda (default=0.5)'
N=R.shape[1]
A_update = copy.deepcopy(R)
A_update[A_update < 0] = 0
A_update[range(N),range(N)] = 0
A_update = np.sum(A_update,axis=0)
A_update += np.diagonal(R)
A_update = np.tile(A_update,(N,1))
R2 = copy.deepcopy(R)
R2[R < 0] = 0
A_update -= R2
A_update[A_update > 0] = 0
R2 = copy.deepcopy(R)
R2[range(N),range(N)] = 0
R2[R2 < 0] = 0
A_update[range(N),range(N)] = np.sum(R2,axis=0)
A = (1-lambda_damp)*A_update + lambda_damp*A
return(A)
def aff_prop_r(S,A,R,lambda_damp,bit16):
# 'Affinitity Propagation clustering'
# 'Makes responsibility matrix'
# 'Supply similarity (S), availability (A), and responsibility (R) matrix'
# 'Keyword: lambda (default=0.5)'
N=S.shape[1]
SA=S+A
SA[range(N),range(N)]=-np.inf
#first_max=np.max(SA,axis=1)
idx_max=np.argmax(SA,axis=1)
SA[range(N),idx_max]=-np.inf
second_max=np.max(SA,axis=1)
del SA
if bit16==1:
maxmat=np.ones([N,N],dtype=np.float16)
else:
maxmat=np.ones([N,N],dtype=np.float32)
maxmat[range(N),idx_max]=second_max
R_update = S - maxmat
R = (1 - lambda_damp) * R_update + lambda_damp * R
return(R)
def aff_prop_s(M,preference,prefmultiply,bit16):
# 'Affinitity Propagation clustering'
# 'Makes similaritiy matrix'
# 'supply matrix: [points,observations]'
# 'Example: M=[100,3] for 100 points with 3 values each (e.g. x,y,z)'
# 'Returns similarity matrix S (e.g. [100,100])'
# 'Keyword: preference (default=median(S)), prefmultiply (default=1)'
print('Making similarity matrix...')
sizeM=M.shape
if bit16==1:
S=np.zeros([sizeM[0],sizeM[0]],dtype=np.float16)
else:
S=np.zeros([sizeM[0],sizeM[0]],dtype=np.float32)
if len(sizeM)==1:
tmp=np.tile(M,(sizeM[0],1))
S+=(-(tmp-tmp.T)**2)
else:
for i in range(sizeM[1]):
tmp=np.tile(M[:,i],(sizeM[0],1)).T
S+=(-(tmp-tmp.T)**2)
if preference == None:
preference=np.median(S)
preference*=prefmultiply
S[np.arange(0,sizeM[0]),np.arange(0,sizeM[0])]=preference
print('Done.')
return(S)
def aff_prop(M,preference=None,prefmultiply=1,lambda_damp=0.5,maxiter=10,maxtries=10000,verbose=False,bit16=0):
# 'Affinitity Propagation clustering for IDL, and for Python :)'
# 'Supply matrix: [points,observations]'
# 'Example: M=[100,3] for 100 points with 3 values each (e.g. x,y,z)'
# 'Clustering occurs on the basis of the observations. Any number of observations allowed'
# 'Returns an array of labels with the same size as number of points'
# 'Each entry of the returned label array refers to the index of its exemplar'
# 'Keyword: preference (default=median similarity). How likely points choose themself as exemplar. Ranges from [-inf,0]'
# 'Keyword: prefmultiply (default=1). Scales the preference relative to the median'
# 'Keyword: lambda_damp (default=0.5). Damping of the iterative updating routine (higher=slower)'
# 'Keyword: maxiter (default=10). Max iterations of the same label outcome'
# 'Keyword: maxtries (default=10000). Max nr of tries. Useful if stuck in loop'
# 'Keyword: verb. Print label iteration and nr tries'
# 'Keyword: bit16 (default=0). 16-bit instead of 32-bit precision'
if bit16==1:
M=M.astype(np.float16)
S=aff_prop_s(M,preference,prefmultiply,bit16)
SM=M.shape
R=copy.deepcopy(S)*0
A=copy.deepcopy(S)*0
labels=np.arange(0,SM[0])
labels2=copy.deepcopy(labels)
now=0
ntries=0
print('Starting convergence loop.')
while ((now < maxiter) & (ntries < maxtries)):
R=aff_prop_r(S,A,R,lambda_damp,bit16)
A=aff_prop_a(A,R,lambda_damp)
solut=A+R
maxsolut=np.max(solut,axis=1)
for i in range(SM[0]):
tmp=np.where(solut[i,:]==maxsolut[i])
labels[i]=np.median(tmp)
if np.array_equal(labels,labels2):
now+=1
else:
labels2=copy.deepcopy(labels)
now=0
ntries+=1
if verbose:
print('ITER: ',now)
print('TRY: ',ntries)
return(labels)