From 95f235c8eea3b26ba64328690d699fe4c45a389f Mon Sep 17 00:00:00 2001 From: Mayur Bhosale Date: Fri, 6 Jul 2018 15:58:42 +0530 Subject: [PATCH 1/8] fixed the Geometric operations like contains/intersects --- .../spark3d/geometryObjects/BoxEnvelope.scala | 70 +++++++------------ .../OctreePartitionerTest.scala | 8 +-- .../spatialPartitioning/OctreeTest.scala | 12 ++-- 3 files changed, 36 insertions(+), 54 deletions(-) diff --git a/src/main/scala/com/spark3d/geometryObjects/BoxEnvelope.scala b/src/main/scala/com/spark3d/geometryObjects/BoxEnvelope.scala index dbfcdaa..107ed48 100644 --- a/src/main/scala/com/spark3d/geometryObjects/BoxEnvelope.scala +++ b/src/main/scala/com/spark3d/geometryObjects/BoxEnvelope.scala @@ -16,7 +16,6 @@ package com.astrolabsoftware.spark3d.geometryObjects import com.astrolabsoftware.spark3d.geometryObjects.Shape3D._ -import com.astrolabsoftware.spark3d.utils.Utils.sphericalToCartesian import scala.math._ @@ -460,11 +459,11 @@ class BoxEnvelope private( return false } - !(env.minX > maxX || - env.maxX < minX || - env.minY > maxY || - env.maxY < minY || - env.minZ > maxZ || + !(env.minX >= maxX || + env.maxX < minX || + env.minY >= maxY || + env.maxY < minY || + env.minZ >= maxZ || env.maxZ < minZ) } @@ -486,41 +485,24 @@ class BoxEnvelope private( using Utils.euclideantoSpherical(p). """) } + if (isNull) { return false } val envMinX = min(p1.x, min(p2.x, p3.x)) - if (envMinX > maxX) { - return false - } - val envMaxX = max(p1.x, max(p2.x, p3.x)) - if (envMaxX < minX) { - return false - } - val envMinY = min(p1.y, min(p2.y, p3.y)) - if (envMinY > maxY) { - return false - } - val envMaxY = max(p1.y, max(p2.y, p3.y)) - if (envMaxY < minY) { - return false - } - val envMinZ = min(p1.z, min(p2.z, p3.z)) - if (envMinZ > maxZ) { - return false - } - val envMaxZ = max(p1.z, max(p2.z, p3.z)) - if (envMaxZ < minZ) { - return false - } - true + !(envMinX >= maxX || + envMaxX < minX || + envMinY >= maxY || + envMaxY < minY || + envMinZ >= maxZ || + envMaxZ < minZ) } /** @@ -537,12 +519,12 @@ class BoxEnvelope private( return false } - !(x < minX || - x > maxX || - y < minY || - y > maxY || - z < minZ || - z > maxZ) + !(x >= maxX || + x < minX || + y >= maxY || + y < minY || + z >= maxZ || + z < minZ) } /** @@ -624,12 +606,12 @@ class BoxEnvelope private( return false } - x >= minX && - x <= maxX && + x >= minX && + x < maxX && y >= minY && - y <= maxY && + y < maxY && z >= minZ && - z <= maxZ + z < maxZ } @@ -645,12 +627,12 @@ class BoxEnvelope private( return false } - env.minX >= minX && - env.maxX <= maxX && + env.minX >= minX && + env.maxX < maxX && env.minY >= minY && - env.maxY <= maxY && + env.maxY < maxY && env.minZ >= minZ && - env.maxZ <= maxZ + env.maxZ < maxZ } /** diff --git a/src/test/scala/com/spark3d/spatialPartitioning/OctreePartitionerTest.scala b/src/test/scala/com/spark3d/spatialPartitioning/OctreePartitionerTest.scala index d9a3c0c..2e1fe4e 100644 --- a/src/test/scala/com/spark3d/spatialPartitioning/OctreePartitionerTest.scala +++ b/src/test/scala/com/spark3d/spatialPartitioning/OctreePartitionerTest.scala @@ -25,10 +25,10 @@ class OctreePartitionerTest extends FunSuite with BeforeAndAfterAll { test ("Can you correctly place a Point3D inside the Octree space?") { var valid_tree = new Octree(BoxEnvelope.apply(0.0, 4.0, 0.0, 4.0, 0.0, 4.0), 0, null, 2) - val element1 = BoxEnvelope.apply(0.0, 1.0, 0.0, 1.0, 0.0, 1.0) + val element1 = BoxEnvelope.apply(0.0, 0.9, 0.0, 0.9, 0.0, 0.9) val element2 = BoxEnvelope.apply(1.0, 3.0, 1.0, 3.0, 1.0, 3.0) - val element3 = BoxEnvelope.apply(1.0, 2.0, 1.0, 2.0, 1.0, 2.0) - val element4 = BoxEnvelope.apply(0.0, 1.0, 1.0, 2.0, 0.0, 1.0) + val element3 = BoxEnvelope.apply(1.0, 1.9, 1.0, 1.9, 1.0, 1.9) + val element4 = BoxEnvelope.apply(0.0, 0.9, 1.0, 1.9, 0.0, 0.9) val data = new ListBuffer[BoxEnvelope] data += element1 data += element2 @@ -43,7 +43,7 @@ class OctreePartitionerTest extends FunSuite with BeforeAndAfterAll { assert(result.next._1 == 13) // case when object belongs to all partitions - spr = new ShellEnvelope(2, 2, 2, false, 1) + spr = new ShellEnvelope(2, 2, 2, false, 1.1) result = partitioner.placeObject(spr) var resultCount = 0 while (result.hasNext) { diff --git a/src/test/scala/com/spark3d/spatialPartitioning/OctreeTest.scala b/src/test/scala/com/spark3d/spatialPartitioning/OctreeTest.scala index 438cfb1..d0db2bf 100644 --- a/src/test/scala/com/spark3d/spatialPartitioning/OctreeTest.scala +++ b/src/test/scala/com/spark3d/spatialPartitioning/OctreeTest.scala @@ -44,12 +44,12 @@ class OctreeTest extends FunSuite with BeforeAndAfterAll { } test ("Can you insert the elements into a Octree and verify its correctness?") { - val element1 = BoxEnvelope.apply(0.0, 1.0, 0.0, 1.0, 0.0, 1.0) - val element2 = BoxEnvelope.apply(1.0, 3.0, 1.0, 3.0, 1.0, 3.0) + val element1 = BoxEnvelope.apply(0.0, 0.9, 0.0, 0.9, 0.0, 0.9) + val element2 = BoxEnvelope.apply(1.0, 2.9, 1.0, 2.9, 1.0, 2.9) valid_tree.insertElement(element1) valid_tree.insertElement(element2) - val spr = new ShellEnvelope(1.0, 1.0, 1.0, false, 1.0) + val spr = new ShellEnvelope(1.0, 1.0, 1.0, false, 0.9) var result = valid_tree.getElements(spr) assert(valid_tree.isLeaf) assert(result.size == 2) @@ -59,9 +59,9 @@ class OctreeTest extends FunSuite with BeforeAndAfterAll { assert(leafNodes.size == 1) assert(containsElement(leafNodes, tree_space)) - val element3 = BoxEnvelope.apply(1.0, 2.0, 1.0, 2.0, 1.0, 2.0) + val element3 = BoxEnvelope.apply(1.0, 1.9, 1.0, 1.9, 1.0, 1.9) valid_tree.insertElement(element3) - result = valid_tree.getElements(BoxEnvelope.apply(3.0, 4.0, 3.0, 4.0, 3.0, 4.0)) + result = valid_tree.getElements(BoxEnvelope.apply(3.0, 3.9, 3.0, 3.9, 3.0, 3.9)) assert(result.size == 1) assert(containsElement(result, element2)) result = valid_tree.getElements(element1) @@ -70,7 +70,7 @@ class OctreeTest extends FunSuite with BeforeAndAfterAll { assert(containsElement(result, element2)) assert(containsElement(result, element3)) - val element4 = BoxEnvelope.apply(0.0, 1.0, 1.0, 2.0, 0.0, 1.0) + val element4 = BoxEnvelope.apply(0.0, 0.9, 1.0, 1.9, 0.0, 0.9) valid_tree.insertElement(element4) result = valid_tree.getElements(element1) From 9d3e3b419689b77e410c456ee6e551d8f88ded2a Mon Sep 17 00:00:00 2001 From: Mayur Bhosale Date: Mon, 9 Jul 2018 11:59:59 +0530 Subject: [PATCH 2/8] fixed the octree dividing the node bug --- .../spark3d/spatialPartitioning/Octree.scala | 56 ++++++++++--------- 1 file changed, 30 insertions(+), 26 deletions(-) diff --git a/src/main/scala/com/spark3d/spatialPartitioning/Octree.scala b/src/main/scala/com/spark3d/spatialPartitioning/Octree.scala index 4a386cf..9f1f80c 100644 --- a/src/main/scala/com/spark3d/spatialPartitioning/Octree.scala +++ b/src/main/scala/com/spark3d/spatialPartitioning/Octree.scala @@ -73,60 +73,64 @@ class Octree( private def splitBox(): Unit = { children = new Array[Octree](8) + val midX = (box.minX + box.maxX) / 2 + val midY = (box.minY + box.maxY) / 2 + val midZ = (box.minZ + box.maxZ) / 2 + children(CHILD_L_SW) = new Octree( BoxEnvelope.apply( - box.minX, (box.maxX - box.minX) / 2, - box.minY, (box.maxY - box.minY) / 2, - box.minZ, (box.maxZ - box.minZ) / 2), + box.minX, midX, + box.minY, midY, + box.minZ, midZ), level + 1, this, maxItemsPerNode, maxLevel) children(CHILD_L_SE) = new Octree( BoxEnvelope.apply( - (box.maxX - box.minX) / 2, box.maxX, - box.minY, (box.maxY - box.minY) / 2, - box.minZ, (box.maxZ - box.minZ) / 2), + midX, box.maxX, + box.minY, midY, + box.minZ, midZ), level + 1, this, maxItemsPerNode, maxLevel) children(CHILD_L_NW) = new Octree( BoxEnvelope.apply( - box.minX, (box.maxX - box.minX) / 2, - (box.maxY - box.minY) / 2, box.maxY, - box.minZ, (box.maxZ - box.minZ) / 2), + box.minX, midX, + midY, box.maxY, + box.minZ, midZ), level + 1, this, maxItemsPerNode, maxLevel) children(CHILD_L_NE) = new Octree( BoxEnvelope.apply( - (box.maxX - box.minX) / 2, box.maxX, - (box.maxY - box.minY) / 2, box.maxY, - box.minZ, (box.maxZ - box.minZ) / 2), + midX, box.maxX, + midY, box.maxY, + box.minZ, midZ), level + 1, this, maxItemsPerNode, maxLevel) children(CHILD_U_SW) = new Octree( BoxEnvelope.apply( - box.minX, (box.maxX - box.minX) / 2, - box.minY, (box.maxY - box.minY) / 2, - (box.maxZ - box.minZ) / 2, box.maxZ), + box.minX, midX, + box.minY, midY, + midZ, box.maxZ), level + 1, this, maxItemsPerNode, maxLevel) children(CHILD_U_SE) = new Octree( BoxEnvelope.apply( - (box.maxX - box.minX) / 2, box.maxX, - box.minY, (box.maxY - box.minY) / 2, - (box.maxZ - box.minZ) / 2, box.maxZ), + midX, box.maxX, + box.minY, midY, + midZ, box.maxZ), level + 1, this, maxItemsPerNode, maxLevel) children(CHILD_U_NW) = new Octree( BoxEnvelope.apply( - box.minX, (box.maxX - box.minX) / 2, - (box.maxY - box.minY) / 2, box.maxY, - (box.maxZ - box.minZ) / 2, box.maxZ), + box.minX, midX, + midY, box.maxY, + midZ, box.maxZ), level + 1, this, maxItemsPerNode, maxLevel) children(CHILD_U_NE) = new Octree( BoxEnvelope.apply( - (box.maxX - box.minX) / 2, box.maxX, - (box.maxY - box.minY) / 2, box.maxY, - (box.maxZ - box.minZ) / 2, box.maxZ), + midX, box.maxX, + midY, box.maxY, + midZ, box.maxZ), level + 1, this, maxItemsPerNode, maxLevel) } @@ -384,8 +388,8 @@ class Octree( val matchedLeaves = new ListBuffer[Octree] val traverseFunct: (Octree, BoxEnvelope) => Boolean = { (node, obj) => node.isLeaf && (node.box.intersects(obj) || - node.box.contains(obj) || - obj.contains(node.box)) + node.box.contains(obj)) +// obj.contains(node.box)) } dfsTraverse(traverseFunct, obj, matchedLeaves) From f74a2715b0afa8d4f2ddf6d24de5cd20cd4085e3 Mon Sep 17 00:00:00 2001 From: Mayur Bhosale Date: Tue, 10 Jul 2018 15:54:17 +0530 Subject: [PATCH 3/8] added unique priority queue implementation based on the implementation in spark --- .../spatialOperator/SpatialQuery.scala | 28 +++++++++++++++++++ 1 file changed, 28 insertions(+) diff --git a/src/main/scala/com/spark3d/spatialOperator/SpatialQuery.scala b/src/main/scala/com/spark3d/spatialOperator/SpatialQuery.scala index 9cbd8c7..936f6f5 100644 --- a/src/main/scala/com/spark3d/spatialOperator/SpatialQuery.scala +++ b/src/main/scala/com/spark3d/spatialOperator/SpatialQuery.scala @@ -18,6 +18,7 @@ package com.astrolabsoftware.spark3d.spatialOperator import com.astrolabsoftware.spark3d.geometryObjects.Shape3D.Shape3D import com.astrolabsoftware.spark3d.utils.GeometryObjectComparator +import com.astrolabsoftware.spark3d.utils.BoundedUniquePriorityQueue import org.apache.spark.rdd.RDD import com.astrolabsoftware.spark3d.spatialPartitioning._ @@ -25,6 +26,7 @@ import scala.collection.mutable import scala.collection.mutable.{HashSet, ListBuffer, PriorityQueue} import scala.reflect.ClassTag import scala.util.control.Breaks._ +import org.apache.spark.util.collection.{Utils => collectionUtils} object SpatialQuery { @@ -123,4 +125,30 @@ object SpatialQuery { } knn_f.toList } + + private def takeOrdered[A <: Shape3D: ClassTag](rdd: RDD[A], num: Int, queryObject: A, unique: Boolean = false)(implicit ord: Ordering[A]): Array[A] = { + + if (unique) { + if (num == 0) { + Array.empty + } else { + val mapRDDs = rdd.mapPartitions { items => + val queue = new BoundedUniquePriorityQueue[A](num)(ord.reverse) + queue ++= collectionUtils.takeOrdered(items, num)(ord) + Iterator.single(queue) + } + if (mapRDDs.partitions.length == 0) { + Array.empty + } else { + mapRDDs.reduce { (queue1, queue2) => + queue1 ++= queue2 + queue1 + }.toArray.sorted(ord) + } + } + + } + + return rdd.takeOrdered(num)(new GeometryObjectComparator[A](queryObject.center)) + } } From df1ba17c8899b36e08348e36669e15ced578a948 Mon Sep 17 00:00:00 2001 From: Mayur Bhosale Date: Wed, 11 Jul 2018 12:16:24 +0530 Subject: [PATCH 4/8] added the unique implementation --- .../spatialOperator/SpatialQuery.scala | 34 ++------------- src/main/scala/com/spark3d/utils/Utils.scala | 41 +++++++++++++++++++ 2 files changed, 45 insertions(+), 30 deletions(-) diff --git a/src/main/scala/com/spark3d/spatialOperator/SpatialQuery.scala b/src/main/scala/com/spark3d/spatialOperator/SpatialQuery.scala index 936f6f5..e9f0c9a 100644 --- a/src/main/scala/com/spark3d/spatialOperator/SpatialQuery.scala +++ b/src/main/scala/com/spark3d/spatialOperator/SpatialQuery.scala @@ -18,15 +18,13 @@ package com.astrolabsoftware.spark3d.spatialOperator import com.astrolabsoftware.spark3d.geometryObjects.Shape3D.Shape3D import com.astrolabsoftware.spark3d.utils.GeometryObjectComparator -import com.astrolabsoftware.spark3d.utils.BoundedUniquePriorityQueue +import com.astrolabsoftware.spark3d.utils.Utils.takeOrdered import org.apache.spark.rdd.RDD import com.astrolabsoftware.spark3d.spatialPartitioning._ -import scala.collection.mutable import scala.collection.mutable.{HashSet, ListBuffer, PriorityQueue} import scala.reflect.ClassTag import scala.util.control.Breaks._ -import org.apache.spark.util.collection.{Utils => collectionUtils} object SpatialQuery { @@ -40,8 +38,9 @@ object SpatialQuery { * @param k number of nearest neighbors are to be found * @return knn */ - def KNN[A <: Shape3D: ClassTag, B <:Shape3D: ClassTag](queryObject: A, rdd: RDD[B], k: Int): List[B] = { - val knn = rdd.takeOrdered(k)(new GeometryObjectComparator[B](queryObject.center)) + def KNN[T <: Shape3D: ClassTag](queryObject: T, rdd: RDD[T], k: Int, unique: Boolean = false): List[T] = { +// val knn = rdd.takeOrdered(k)(new GeometryObjectComparator[B](queryObject.center)) + val knn = takeOrdered[T](rdd, k, queryObject, unique)(new GeometryObjectComparator[T](queryObject.center)) knn.toList } @@ -126,29 +125,4 @@ object SpatialQuery { knn_f.toList } - private def takeOrdered[A <: Shape3D: ClassTag](rdd: RDD[A], num: Int, queryObject: A, unique: Boolean = false)(implicit ord: Ordering[A]): Array[A] = { - - if (unique) { - if (num == 0) { - Array.empty - } else { - val mapRDDs = rdd.mapPartitions { items => - val queue = new BoundedUniquePriorityQueue[A](num)(ord.reverse) - queue ++= collectionUtils.takeOrdered(items, num)(ord) - Iterator.single(queue) - } - if (mapRDDs.partitions.length == 0) { - Array.empty - } else { - mapRDDs.reduce { (queue1, queue2) => - queue1 ++= queue2 - queue1 - }.toArray.sorted(ord) - } - } - - } - - return rdd.takeOrdered(num)(new GeometryObjectComparator[A](queryObject.center)) - } } diff --git a/src/main/scala/com/spark3d/utils/Utils.scala b/src/main/scala/com/spark3d/utils/Utils.scala index 3cd250c..9263e60 100644 --- a/src/main/scala/com/spark3d/utils/Utils.scala +++ b/src/main/scala/com/spark3d/utils/Utils.scala @@ -15,7 +15,14 @@ */ package com.astrolabsoftware.spark3d.utils +import com.astrolabsoftware.spark3d.geometryObjects.Shape3D.Shape3D import com.astrolabsoftware.spark3d.geometryObjects._ +import com.google.common.collect.{Ordering => GuavaOrdering} + +import org.apache.spark.rdd.RDD + +import scala.reflect.ClassTag +import scala.collection.JavaConverters._ object Utils { @@ -102,4 +109,38 @@ object Utils { ra } } + + def takeOrdered[T <: Shape3D: ClassTag](rdd: RDD[T], num: Int, queryObject: T, unique: Boolean = false)(ord: Ordering[T]): Array[T] = { + + if (unique) { + if (num == 0) { + Array.empty + } else { + val mapRDDs = rdd.mapPartitions { items => + val queue = new BoundedUniquePriorityQueue[T](num)(ord.reverse) + queue ++= takeOrdered(items, num)(ord) + Iterator.single(queue) + } + if (mapRDDs.partitions.length == 0) { + return Array.empty + } else { + return mapRDDs.reduce { (queue1, queue2) => + queue1 ++= queue2 + queue1 + }.toArray.sorted(ord) + } + } + + } + + return rdd.takeOrdered(num)(new GeometryObjectComparator[T](queryObject.center)) + } + + private def takeOrdered[T](input: Iterator[T], num: Int)(implicit ord: Ordering[T]): Iterator[T] = { + val ordering = new GuavaOrdering[T] { + override def compare(l: T, r: T): Int = ord.compare(l, r) + } + ordering.leastOf(input.asJava, num).iterator.asScala + } + } From 7a6c303ddf85504d2c1a2617fc4596a1df79a627 Mon Sep 17 00:00:00 2001 From: Mayur Bhosale Date: Wed, 11 Jul 2018 12:53:47 +0530 Subject: [PATCH 5/8] added a Scala based implementation of the Unique Bounded Priority Queue --- .../utils/BoundedUniquePriorityQueue.scala | 68 +++++++++++++++++++ 1 file changed, 68 insertions(+) create mode 100644 src/main/scala/com/spark3d/utils/BoundedUniquePriorityQueue.scala diff --git a/src/main/scala/com/spark3d/utils/BoundedUniquePriorityQueue.scala b/src/main/scala/com/spark3d/utils/BoundedUniquePriorityQueue.scala new file mode 100644 index 0000000..9298889 --- /dev/null +++ b/src/main/scala/com/spark3d/utils/BoundedUniquePriorityQueue.scala @@ -0,0 +1,68 @@ +/* + * Copyright 2018 Mayur Bhosale + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.astrolabsoftware.spark3d.utils + +import java.io.Serializable + +import com.astrolabsoftware.spark3d.geometryObjects.Shape3D.Shape3D + +import scala.collection.generic.Growable +import collection.mutable.PriorityQueue + +class BoundedUniquePriorityQueue[A <: Shape3D](maxSize: Int)(implicit ord: Ordering[A]) + extends Iterable[A] with Growable[A] with Serializable { + + private val underlying = new PriorityQueue[A]()(ord) + + override def iterator: Iterator[A] = underlying.iterator + + override def size: Int = underlying.size + + override def ++=(xs: TraversableOnce[A]): this.type = { + xs.foreach { this += _ } + this + } + + override def +=(elem: A): this.type = { + if (!underlying.map(x => x.center).toList.contains(elem.center)) { + if (size < maxSize) { + underlying.enqueue(elem) + } else { + maybeReplaceLowest(elem) + } + } + + this + } + + override def +=(elem1: A, elem2: A, elems: A*): this.type = { + this += elem1 += elem2 ++= elems + } + + override def clear() { underlying.clear() } + + private def maybeReplaceLowest(a: A): Boolean = { + val head = underlying.head + if (head != null && ord.gt(a, head)) { + underlying.dequeue + underlying.enqueue(a) + true + } else { + false + } + } +} + From b96efb1deada129ee5726ea9833d89077f1d6503 Mon Sep 17 00:00:00 2001 From: Mayur Bhosale Date: Wed, 11 Jul 2018 13:03:27 +0530 Subject: [PATCH 6/8] Revert "added a Scala based implementation of the Unique Bounded Priority Queue" This reverts commit 7a6c303ddf85504d2c1a2617fc4596a1df79a627. --- .../utils/BoundedUniquePriorityQueue.scala | 68 ------------------- 1 file changed, 68 deletions(-) delete mode 100644 src/main/scala/com/spark3d/utils/BoundedUniquePriorityQueue.scala diff --git a/src/main/scala/com/spark3d/utils/BoundedUniquePriorityQueue.scala b/src/main/scala/com/spark3d/utils/BoundedUniquePriorityQueue.scala deleted file mode 100644 index 9298889..0000000 --- a/src/main/scala/com/spark3d/utils/BoundedUniquePriorityQueue.scala +++ /dev/null @@ -1,68 +0,0 @@ -/* - * Copyright 2018 Mayur Bhosale - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.astrolabsoftware.spark3d.utils - -import java.io.Serializable - -import com.astrolabsoftware.spark3d.geometryObjects.Shape3D.Shape3D - -import scala.collection.generic.Growable -import collection.mutable.PriorityQueue - -class BoundedUniquePriorityQueue[A <: Shape3D](maxSize: Int)(implicit ord: Ordering[A]) - extends Iterable[A] with Growable[A] with Serializable { - - private val underlying = new PriorityQueue[A]()(ord) - - override def iterator: Iterator[A] = underlying.iterator - - override def size: Int = underlying.size - - override def ++=(xs: TraversableOnce[A]): this.type = { - xs.foreach { this += _ } - this - } - - override def +=(elem: A): this.type = { - if (!underlying.map(x => x.center).toList.contains(elem.center)) { - if (size < maxSize) { - underlying.enqueue(elem) - } else { - maybeReplaceLowest(elem) - } - } - - this - } - - override def +=(elem1: A, elem2: A, elems: A*): this.type = { - this += elem1 += elem2 ++= elems - } - - override def clear() { underlying.clear() } - - private def maybeReplaceLowest(a: A): Boolean = { - val head = underlying.head - if (head != null && ord.gt(a, head)) { - underlying.dequeue - underlying.enqueue(a) - true - } else { - false - } - } -} - From a26211654ebd10c25889eb10f28e5d9579a7c813 Mon Sep 17 00:00:00 2001 From: Mayur Bhosale Date: Wed, 11 Jul 2018 13:03:38 +0530 Subject: [PATCH 7/8] Revert "added the unique implementation" This reverts commit df1ba17c8899b36e08348e36669e15ced578a948. --- .../spatialOperator/SpatialQuery.scala | 34 +++++++++++++-- src/main/scala/com/spark3d/utils/Utils.scala | 41 ------------------- 2 files changed, 30 insertions(+), 45 deletions(-) diff --git a/src/main/scala/com/spark3d/spatialOperator/SpatialQuery.scala b/src/main/scala/com/spark3d/spatialOperator/SpatialQuery.scala index e9f0c9a..936f6f5 100644 --- a/src/main/scala/com/spark3d/spatialOperator/SpatialQuery.scala +++ b/src/main/scala/com/spark3d/spatialOperator/SpatialQuery.scala @@ -18,13 +18,15 @@ package com.astrolabsoftware.spark3d.spatialOperator import com.astrolabsoftware.spark3d.geometryObjects.Shape3D.Shape3D import com.astrolabsoftware.spark3d.utils.GeometryObjectComparator -import com.astrolabsoftware.spark3d.utils.Utils.takeOrdered +import com.astrolabsoftware.spark3d.utils.BoundedUniquePriorityQueue import org.apache.spark.rdd.RDD import com.astrolabsoftware.spark3d.spatialPartitioning._ +import scala.collection.mutable import scala.collection.mutable.{HashSet, ListBuffer, PriorityQueue} import scala.reflect.ClassTag import scala.util.control.Breaks._ +import org.apache.spark.util.collection.{Utils => collectionUtils} object SpatialQuery { @@ -38,9 +40,8 @@ object SpatialQuery { * @param k number of nearest neighbors are to be found * @return knn */ - def KNN[T <: Shape3D: ClassTag](queryObject: T, rdd: RDD[T], k: Int, unique: Boolean = false): List[T] = { -// val knn = rdd.takeOrdered(k)(new GeometryObjectComparator[B](queryObject.center)) - val knn = takeOrdered[T](rdd, k, queryObject, unique)(new GeometryObjectComparator[T](queryObject.center)) + def KNN[A <: Shape3D: ClassTag, B <:Shape3D: ClassTag](queryObject: A, rdd: RDD[B], k: Int): List[B] = { + val knn = rdd.takeOrdered(k)(new GeometryObjectComparator[B](queryObject.center)) knn.toList } @@ -125,4 +126,29 @@ object SpatialQuery { knn_f.toList } + private def takeOrdered[A <: Shape3D: ClassTag](rdd: RDD[A], num: Int, queryObject: A, unique: Boolean = false)(implicit ord: Ordering[A]): Array[A] = { + + if (unique) { + if (num == 0) { + Array.empty + } else { + val mapRDDs = rdd.mapPartitions { items => + val queue = new BoundedUniquePriorityQueue[A](num)(ord.reverse) + queue ++= collectionUtils.takeOrdered(items, num)(ord) + Iterator.single(queue) + } + if (mapRDDs.partitions.length == 0) { + Array.empty + } else { + mapRDDs.reduce { (queue1, queue2) => + queue1 ++= queue2 + queue1 + }.toArray.sorted(ord) + } + } + + } + + return rdd.takeOrdered(num)(new GeometryObjectComparator[A](queryObject.center)) + } } diff --git a/src/main/scala/com/spark3d/utils/Utils.scala b/src/main/scala/com/spark3d/utils/Utils.scala index 9263e60..3cd250c 100644 --- a/src/main/scala/com/spark3d/utils/Utils.scala +++ b/src/main/scala/com/spark3d/utils/Utils.scala @@ -15,14 +15,7 @@ */ package com.astrolabsoftware.spark3d.utils -import com.astrolabsoftware.spark3d.geometryObjects.Shape3D.Shape3D import com.astrolabsoftware.spark3d.geometryObjects._ -import com.google.common.collect.{Ordering => GuavaOrdering} - -import org.apache.spark.rdd.RDD - -import scala.reflect.ClassTag -import scala.collection.JavaConverters._ object Utils { @@ -109,38 +102,4 @@ object Utils { ra } } - - def takeOrdered[T <: Shape3D: ClassTag](rdd: RDD[T], num: Int, queryObject: T, unique: Boolean = false)(ord: Ordering[T]): Array[T] = { - - if (unique) { - if (num == 0) { - Array.empty - } else { - val mapRDDs = rdd.mapPartitions { items => - val queue = new BoundedUniquePriorityQueue[T](num)(ord.reverse) - queue ++= takeOrdered(items, num)(ord) - Iterator.single(queue) - } - if (mapRDDs.partitions.length == 0) { - return Array.empty - } else { - return mapRDDs.reduce { (queue1, queue2) => - queue1 ++= queue2 - queue1 - }.toArray.sorted(ord) - } - } - - } - - return rdd.takeOrdered(num)(new GeometryObjectComparator[T](queryObject.center)) - } - - private def takeOrdered[T](input: Iterator[T], num: Int)(implicit ord: Ordering[T]): Iterator[T] = { - val ordering = new GuavaOrdering[T] { - override def compare(l: T, r: T): Int = ord.compare(l, r) - } - ordering.leastOf(input.asJava, num).iterator.asScala - } - } From c43b83fca455ad4ea2abb87b8845eea4aa3fc287 Mon Sep 17 00:00:00 2001 From: Mayur Bhosale Date: Wed, 11 Jul 2018 13:03:41 +0530 Subject: [PATCH 8/8] Revert "added unique priority queue implementation based on the implementation in spark" This reverts commit f74a2715b0afa8d4f2ddf6d24de5cd20cd4085e3. --- .../spatialOperator/SpatialQuery.scala | 28 ------------------- 1 file changed, 28 deletions(-) diff --git a/src/main/scala/com/spark3d/spatialOperator/SpatialQuery.scala b/src/main/scala/com/spark3d/spatialOperator/SpatialQuery.scala index 936f6f5..9cbd8c7 100644 --- a/src/main/scala/com/spark3d/spatialOperator/SpatialQuery.scala +++ b/src/main/scala/com/spark3d/spatialOperator/SpatialQuery.scala @@ -18,7 +18,6 @@ package com.astrolabsoftware.spark3d.spatialOperator import com.astrolabsoftware.spark3d.geometryObjects.Shape3D.Shape3D import com.astrolabsoftware.spark3d.utils.GeometryObjectComparator -import com.astrolabsoftware.spark3d.utils.BoundedUniquePriorityQueue import org.apache.spark.rdd.RDD import com.astrolabsoftware.spark3d.spatialPartitioning._ @@ -26,7 +25,6 @@ import scala.collection.mutable import scala.collection.mutable.{HashSet, ListBuffer, PriorityQueue} import scala.reflect.ClassTag import scala.util.control.Breaks._ -import org.apache.spark.util.collection.{Utils => collectionUtils} object SpatialQuery { @@ -125,30 +123,4 @@ object SpatialQuery { } knn_f.toList } - - private def takeOrdered[A <: Shape3D: ClassTag](rdd: RDD[A], num: Int, queryObject: A, unique: Boolean = false)(implicit ord: Ordering[A]): Array[A] = { - - if (unique) { - if (num == 0) { - Array.empty - } else { - val mapRDDs = rdd.mapPartitions { items => - val queue = new BoundedUniquePriorityQueue[A](num)(ord.reverse) - queue ++= collectionUtils.takeOrdered(items, num)(ord) - Iterator.single(queue) - } - if (mapRDDs.partitions.length == 0) { - Array.empty - } else { - mapRDDs.reduce { (queue1, queue2) => - queue1 ++= queue2 - queue1 - }.toArray.sorted(ord) - } - } - - } - - return rdd.takeOrdered(num)(new GeometryObjectComparator[A](queryObject.center)) - } }