Skip to content

Commit

Permalink
Revert "added unique priority queue implementation based on the imple…
Browse files Browse the repository at this point in the history
…mentation in spark"

This reverts commit f74a271.
  • Loading branch information
mayurdb committed Jul 11, 2018
1 parent a262116 commit c43b83f
Showing 1 changed file with 0 additions and 28 deletions.
28 changes: 0 additions & 28 deletions src/main/scala/com/spark3d/spatialOperator/SpatialQuery.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,13 @@ package com.astrolabsoftware.spark3d.spatialOperator

import com.astrolabsoftware.spark3d.geometryObjects.Shape3D.Shape3D
import com.astrolabsoftware.spark3d.utils.GeometryObjectComparator
import com.astrolabsoftware.spark3d.utils.BoundedUniquePriorityQueue
import org.apache.spark.rdd.RDD
import com.astrolabsoftware.spark3d.spatialPartitioning._

import scala.collection.mutable
import scala.collection.mutable.{HashSet, ListBuffer, PriorityQueue}
import scala.reflect.ClassTag
import scala.util.control.Breaks._
import org.apache.spark.util.collection.{Utils => collectionUtils}

object SpatialQuery {

Expand Down Expand Up @@ -125,30 +123,4 @@ object SpatialQuery {
}
knn_f.toList
}

private def takeOrdered[A <: Shape3D: ClassTag](rdd: RDD[A], num: Int, queryObject: A, unique: Boolean = false)(implicit ord: Ordering[A]): Array[A] = {

if (unique) {
if (num == 0) {
Array.empty
} else {
val mapRDDs = rdd.mapPartitions { items =>
val queue = new BoundedUniquePriorityQueue[A](num)(ord.reverse)
queue ++= collectionUtils.takeOrdered(items, num)(ord)
Iterator.single(queue)
}
if (mapRDDs.partitions.length == 0) {
Array.empty
} else {
mapRDDs.reduce { (queue1, queue2) =>
queue1 ++= queue2
queue1
}.toArray.sorted(ord)
}
}

}

return rdd.takeOrdered(num)(new GeometryObjectComparator[A](queryObject.center))
}
}

0 comments on commit c43b83f

Please sign in to comment.