-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathPSCAN.scala
45 lines (38 loc) · 1.97 KB
/
PSCAN.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
import org.apache.spark.graphx._
import scala.reflect.ClassTag
object PSCAN{
def pscan[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED], epsilon: Double = 0.5): Graph[VertexId, ED] = {
// set vertex property to Set of neighborhood vertex ids
val withNeighboursVertices:VertexRDD[Set[VertexId]] = graph.mapVertices((_, _) => Set[VertexId]())
.aggregateMessages[Set[VertexId]](
sendMsg = edgeContext => {
edgeContext.sendToSrc(Set(edgeContext.dstId))
edgeContext.sendToDst(Set(edgeContext.srcId))
},
mergeMsg = (s1, s2) => {
s1 ++ s2
}
).mapValues((vid, neighbours)=> neighbours + vid)
val neighbours: Graph[Set[VertexId], ED] = graph.outerJoinVertices(withNeighboursVertices)(
(_, _, newValue) => newValue.getOrElse(Set(0L))
)
// compute similarities of all connected vertexs pairs
val edgesWithSimilarity:Graph[Set[VertexId], Double] = neighbours.mapTriplets(edge => {
val sizeOfIntersection = edge.srcAttr.intersect(edge.dstAttr).size
val denominator = Math.sqrt(edge.srcAttr.size * edge.dstAttr.size)
sizeOfIntersection / denominator
})
// remove edges whose similarity is smaller than eposilon
val cutOffGraph:Graph[Set[VertexId], Double] = edgesWithSimilarity.filter[Set[VertexId], Double](
preprocess = g => g,
epred = edge => {
edge.attr >= epsilon
})
// every connected component is a cluster
val componentsGraph:Graph[VertexId, Double] = cutOffGraph.connectedComponents()
// return the origin graph with vertex property set to cluster identifier
graph.outerJoinVertices(componentsGraph.vertices)((vId, oldData, newData) => {
newData.getOrElse(-1)
})
}
}