Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

New partitioner: KD-Tree partitioner #127

Open
wants to merge 49 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
49 commits
Select commit Hold shift + click to select a range
f7c72a8
First Part
Jul 22, 2019
c97e69a
Second Update
Jul 23, 2019
9f486d1
Updated
Jul 23, 2019
1d282dc
Added
Jul 23, 2019
a6f1b74
Added
Jul 24, 2019
4382b67
KDtree option is added
Jul 24, 2019
82e085a
KDtree option is added
Jul 24, 2019
a7a385d
KDtree is added
Jul 24, 2019
ff2086a
KDtree option has been added
Jul 24, 2019
1bf8b37
Updated
Jul 25, 2019
bf3a9ee
Updated
Jul 25, 2019
195169e
Updated
Jul 25, 2019
1be165f
Boundary box problem is fixed
Jul 29, 2019
973bb67
The number of partitions is determined, the the boundary boxes for pa…
Jul 29, 2019
be83043
placePoints method is ready
Jul 29, 2019
3c21725
The level of KDtree which uses to find the boundary boxes for partiti…
Jul 29, 2019
f27582b
it uses to check the KDtree
Jul 29, 2019
6d76411
Validation is addet
Jul 30, 2019
3de67ec
updated
Aug 5, 2019
b2c83b1
Validation is added
Aug 5, 2019
e65baa3
Updated
Aug 5, 2019
ac60b6c
KDtree is added
Aug 5, 2019
f119781
it used for develping KD-tree
Aug 5, 2019
e3f3a8f
containKD methid is added
Aug 5, 2019
472264d
updated
Aug 5, 2019
e02828e
KDtree is added
Aug 5, 2019
5ed8977
Bounding box is fixed
Aug 5, 2019
1827df4
Updated
Aug 5, 2019
ce2745e
nothing
Aug 5, 2019
0d9004e
placePoint method for KD-tree is added
Aug 5, 2019
e7ded79
Bounding box is fixed
Aug 5, 2019
2606485
Merge remote-tracking branch 'upstream/master'
Aug 5, 2019
1d54c58
Nothing
Aug 5, 2019
0048ce4
Nothin
Aug 5, 2019
0f23805
Tracing
Aug 5, 2019
300d28a
Merge branch 'master' into origin
Aug 5, 2019
00a8b3a
Simple commit test
Aug 6, 2019
7e6dcae
Simple commit test
abualia4 Aug 6, 2019
97420ec
KDtree partitioner is added
abualia4 Aug 6, 2019
1775a8e
Documenting the code
abualia4 Aug 8, 2019
2189f56
Documenting the code
abualia4 Aug 8, 2019
d9c8b57
Documenting the code
abualia4 Aug 8, 2019
71f15e5
Documenting the code
abualia4 Aug 8, 2019
e4b541e
Documenting the code
abualia4 Aug 8, 2019
1eb1665
Documenting the code
abualia4 Aug 8, 2019
3feb9f7
Documenting the code
abualia4 Aug 8, 2019
9065432
Documenting the code
abualia4 Aug 8, 2019
57b6705
This bash code is used by Ahmed Alia to evaluate his work
abualia4 Aug 8, 2019
3927c4c
To perform some visual inspection of the KDtree partitioner
abualia4 Aug 8, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 41 additions & 0 deletions arun_scala.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
#!/bin/bash

SBT_VERSION_SPARK=2.11
VERSION=0.3.1

# Compile the code
sbt ++2.11.8 package

PACK=com.github.astrolabsoftware:spark-fits_2.11:0.7.1
SF=target/scala-2.11/spark3d_2.11-${VERSION}.jar
HP=lib/jhealpix.jar

#MASTERURL=spark://134.158.75.222:7077
MASTERURL=yarn
# fitsfn=hdfs://134.158.75.222:8020//lsst/LSST1Y/out_srcs_s1_3.fits
# fitsfn=hdfs://134.158.75.222:8020/user/alia/a.fits
fitsfn=hdfs://134.158.75.222:8020/user/julien.peloton/dc2

#This is used in evaluation of the KDtree partitioner
sum=0
for i in {1..10}
do
echo "Run number is $i"
start_time="$(date -u +%s)"
spark-submit \
--master ${MASTERURL} \
--driver-memory 4g --executor-memory 28g --executor-cores 17 --total-executor-cores 102 \
--jars ${SF},${HP} --packages ${PACK} \
--class com.astrolabsoftware.spark3d.examples.Test \
target/scala-${SBT_VERSION_SPARK}/spark3d_${SBT_VERSION_SPARK}-${VERSION}.jar \
$fitsfn 1 "position_x_mock,position_y_mock,position_z_mock" true "kdtree" 8 "col"
end_time="$(date -u +%s)"
elapsed="$(($end_time-$start_time))"
echo "Total of $elapsed seconds elapsed for process"
sum="$(($sum+$elapsed))"
wait
done
avgT="$(($sum/10))"
echo "average elapsed time is $avgT "


7 changes: 4 additions & 3 deletions run_part_scala.sh
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,9 @@ PACK=com.github.astrolabsoftware:spark-fits_2.11:0.7.1
SF=target/scala-2.11/spark3d_2.11-${VERSION}.jar
HP=lib/jhealpix.jar

MASTERURL=
fitsfn=
MASTERURL=spark://134.158.75.222:7077
fitsfn=hdfs://134.158.75.222:8020//lsst/LSST1Y/out_srcs_s1_0.fits

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The general rule is to not commit URL in a repo. Please keep the previous empty variables:

MASTERURL=
fitsfn=


# "order" "col" "range" "rep"
for method in "range" "col"
Expand All @@ -22,6 +23,6 @@ do
--jars ${SF},${HP} --packages ${PACK} \
--class com.astrolabsoftware.spark3d.examples.PartitioningDF \
target/scala-${SBT_VERSION_SPARK}/spark3d_${SBT_VERSION_SPARK}-${VERSION}.jar \
$fitsfn 1 "Z_COSMO,RA,DEC" true "onion" 102 ${method}
$fitsfn 1 "Z_COSMO,RA,DEC" true "octree" 102 ${method}
done

36 changes: 23 additions & 13 deletions run_viz_scala.sh
Original file line number Diff line number Diff line change
Expand Up @@ -11,21 +11,31 @@ SF=target/scala-2.11/spark3d_2.11-${VERSION}.jar
HP=lib/jhealpix.jar,lib/plotly-assembly-0.3.0-SNAPSHOT.jar

MASTERURL=yarn
inputfn="LSST1Y/out_srcs_s1_0.fits"
#inputfn="LSST1Y/out_srcs_s1_0.fits"
#inputfn=hdfs://134.158.75.222:8020//lsst/LSST1Y/out_srcs_s1_3.fits
#spark-submit \
# --master ${MASTERURL} \
# --driver-memory 4g --executor-memory 28g --executor-cores 17 --total-executor-cores 102 \
# --jars ${SF},${HP} --packages ${PACK} \
# --class com.astrolabsoftware.spark3d.examples.VisualizePart \
#target/scala-${SCAlA_VERSION}/spark3d_${SCAlA_VERSION}-${VERSION}.jar \
# $inputfn "Z_COSMO,RA,DEC" true "onion" 8 0.0001 "username" "api_key"

spark-submit \
--master ${MASTERURL} \
--driver-memory 4g --executor-memory 28g --executor-cores 17 --total-executor-cores 102 \
--jars ${SF},${HP} --packages ${PACK} \
--class com.astrolabsoftware.spark3d.examples.VisualizePart \
target/scala-${SCAlA_VERSION}/spark3d_${SCAlA_VERSION}-${VERSION}.jar \
$inputfn "Z_COSMO,RA,DEC" true "onion" 8 0.0001 "username" "api_key"
#inputfn=hdfs://134.158.75.222:8020/user/julien.peloton/dc2
#spark-submit \
# --master ${MASTERURL} \
# --driver-memory 4g --executor-memory 28g --executor-cores 17 --total-executor-cores 102 \
# --jars ${SP},${HP} --packages ${PACK} \
# --class com.astrolabsoftware.spark3d.examples.VisualizePart \
# target/scala-${SCAlA_VERSION}/spark3d_${SCAlA_VERSION}-${VERSION}.jar \
# $inputfn "position_x_mock,position_y_mock,position_z_mock" false "octree" 512 0.001 "abualia4" "GFz0cUDumcKcnhpMd8qw"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Again, the general rule is to never push sensitive information in a repo! Could you remove your username and api key?


inputfn="dc2"
inputfn=hdfs://134.158.75.222:8020/user/julien.peloton/dc2_rand
spark-submit \
--master ${MASTERURL} \
--driver-memory 4g --executor-memory 28g --executor-cores 17 --total-executor-cores 102 \
--jars ${SP},${HP} --packages ${PACK} \
--class com.astrolabsoftware.spark3d.examples.VisualizePart \
--driver-memory 4g --executor-memory 28g --executor-cores 17 --total-executor-cores 102 \
--jars ${SP},${HP} --packages ${PACK} \
--class com.astrolabsoftware.spark3d.examples.VisualizePart \
target/scala-${SCAlA_VERSION}/spark3d_${SCAlA_VERSION}-${VERSION}.jar \
$inputfn "position_x_mock,position_y_mock,position_z_mock" false "octree" 512 0.001 "username" "api_key"
$inputfn "position_x_mock,position_y_mock,position_z_mock" false "kdtree" 512 0.001 "abualia4" "GFz0cUDumcKcnhpMd8qw"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Again, the general rule is to never push sensitive information in a repo! Could you remove your username and api key?


55 changes: 42 additions & 13 deletions src/main/scala/com/spark3d/Partitioners.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,22 +21,25 @@ import scala.util.Random
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.functions.col

import com.astrolabsoftware.spark3d.geometryObjects.Shape3D._

import com.astrolabsoftware.spark3d.geometryObjects._
import com.astrolabsoftware.spark3d.geometryObjects.Shape3D.Shape3D
import com.astrolabsoftware.spark3d.geometryObjects.Point3D

import com.astrolabsoftware.spark3d.spatialPartitioning.SpatialPartitioner
import com.astrolabsoftware.spark3d.spatialPartitioning.OnionPartitioning
import com.astrolabsoftware.spark3d.spatialPartitioning.OnionPartitioner
import com.astrolabsoftware.spark3d.spatialPartitioning.Octree
import com.astrolabsoftware.spark3d.spatialPartitioning.OctreePartitioning
import com.astrolabsoftware.spark3d.spatialPartitioning.OctreePartitioner
import com.astrolabsoftware.spark3d.utils.Utils.getSampleSize

import com.astrolabsoftware.spark3d.spatialPartitioning.KDtree
import com.astrolabsoftware.spark3d.spatialPartitioning.KDtreePartitioning
import com.astrolabsoftware.spark3d.spatialPartitioning.KDtreePartitioner
/**
* Main object to retrieve SpatialPartitioner.
*/
class Partitioners(df : DataFrame, options: Map[String, String]) extends Serializable {

// Definition of the coordinate system. Spherical or cartesian
val isSpherical : Boolean = options("coordSys") match {
case "spherical" => true
Expand Down Expand Up @@ -80,10 +83,10 @@ class Partitioners(df : DataFrame, options: Map[String, String]) extends Seriali
Geometry not understood! You must choose between:
points or spheres
""")
}


/**
}
/**
* Define a spatial partitioning for rawRDD, and return the partitioner.
* The list of available partitioning can be found in utils/GridType.
* By default, the outgoing level of parallelism is the same as the incoming
Expand Down Expand Up @@ -112,24 +115,27 @@ class Partitioners(df : DataFrame, options: Map[String, String]) extends Seriali
(You put: $numPartitions)
""")
}

// Add here new cases.
val partitioner = gridtype match {
case "onion" => {

// Initialise our space
val partitioning = new OnionPartitioning

partitioning.LinearOnionPartitioning(
numPartitionsRaw,
partitioning.getMaxZ(rawRDD),
isSpherical
)

// Grab the grid elements
val grids = partitioning.getGrids

// Build our partitioner
new OnionPartitioner(grids)
var x= new OnionPartitioner(grids)

x
}
case "octree" => {
// taking 20% of the data as a sample
Expand All @@ -153,14 +159,34 @@ class Partitioners(df : DataFrame, options: Map[String, String]) extends Seriali
val grids = partitioning.getGrids
new OctreePartitioner(octree, grids)
}
//for KDtree
case "kdtree" => {
val dataCount = rawRDD.count
val sampleSize = getSampleSize(dataCount, numPartitionsRaw)
val samples:List[Point3D] = rawRDD.takeSample(false, sampleSize,
new Random(dataCount).nextInt(dataCount.asInstanceOf[Int])).toList.map(x => x.asInstanceOf[Point3D])

// To determine the level which is used in partitioning, also the number of partitions is determined
// Number of partitions is power of 2. 1, 2, 4, 8, 16, 32 and so on
val log2 = (x: Int) => floor(log10(x)/log10(2.0)).asInstanceOf[Int]
val levelPartitioning=log2(numPartitionsRaw) +1

val kdtree=new KDtree( )
val partitioning = KDtreePartitioning.apply(samples, kdtree, levelPartitioning)
val grids:List[BoxEnvelope] = partitioning.getGrids
new KDtreePartitioner(kdtree,grids)

}


// Other cases not handled. RTree in prep.
case _ => throw new AssertionError("""
Unknown grid type! See utils.GridType for available grids.""")
}

// Apply the partitioner and return the RDD
partitioner
partitioner

}

/**
Expand Down Expand Up @@ -208,5 +234,8 @@ class Partitioners(df : DataFrame, options: Map[String, String]) extends Seriali

dataBoundary
}



}

23 changes: 15 additions & 8 deletions src/main/scala/com/spark3d/Repartitioning.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,11 @@ import org.apache.spark.sql.functions.col
import org.apache.spark.sql.functions.lit
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.spark_partition_id
import org.apache.spark.sql.Column


import org.apache.spark.sql.Row
import org.apache.spark.sql.Dataset

import com.astrolabsoftware.spark3d.Partitioners
import com.astrolabsoftware.spark3d.spatialPartitioning.KeyPartitioner
Expand Down Expand Up @@ -56,7 +61,6 @@ object Repartitioning {
* @return Input DataFrame plus an additional column `partition_id`.
*/
def prePartition(df : DataFrame, options: Map[String, String], numPartitions : Int = -1) : DataFrame = {

// Change the number of partitions if wanted
val numOfPartitions = numPartitions match {
case -1 => df.rdd.getNumPartitions
Expand All @@ -80,7 +84,7 @@ object Repartitioning {
}

// Other implemented repartitioners
case grid @ ("onion" | "octree") => {
case grid @ ("onion" | "octree" |"kdtree") => {
// Definition of the coordinate system. Spherical or cartesian
val isSpherical : Boolean = options("coordSys") match {
case "spherical" => true
Expand All @@ -96,13 +100,13 @@ object Repartitioning {

val P = new Partitioners(df, options)
val partitioner = P.get(numOfPartitions)

// Add a column with the new partition indices
val dfExt = geometry match {
case "points" => {
// UDF for the repartitioning
val placePointsUDF = udf[Int, Double, Double, Double, Boolean](partitioner.placePoints)

val placePointsUDF = udf[Int, Double, Double, Double, Boolean](partitioner.placePoints)
df.withColumn("partition_id",
placePointsUDF(
col(colnames(0)).cast("double"),
Expand All @@ -111,11 +115,12 @@ object Repartitioning {
lit(isSpherical)
)
)

}
case "spheres" => {
// UDF for the repartitioning
val placePointsUDF = udf[Int, Double, Double, Double, Double, Boolean](partitioner.placeSpheres)

df.withColumn("partition_id",
placePointsUDF(
col(colnames(0)).cast("double"),
Expand All @@ -132,13 +137,15 @@ object Repartitioning {
""")
}
dfExt


}
case _ => throw new AssertionError("""
Gridtype not understood! You must choose between:
onion, octree, or current
""")
}

dfout
}

Expand Down Expand Up @@ -186,7 +193,7 @@ object Repartitioning {
* +-------------------+-------------------+------------------+------------+
*/
def repartitionByCol(df: DataFrame, colname: String, preLabeled: Boolean, numPartitions: Int = -1): DataFrame = {

// Build a Map (k=df.col -> v=partition_id)
// to allow the use of standard (Int) partitioners (can be costly).
val mapPart : Map[Any, Int] = preLabeled match {
Expand Down
Loading