diff --git a/docs/01_installation.md b/docs/01_installation.md index bcb91cc..c7b1ea9 100644 --- a/docs/01_installation.md +++ b/docs/01_installation.md @@ -20,7 +20,7 @@ another version, feel free to contact us. In addition to Spark, the library has You can link spark3D to your project (either `spark-shell` or `spark-submit`) by specifying the coordinates: ```bash -toto:~$ spark-submit --packages "com.github.astrolabsoftware:spark3d_2.11:0.1.1" <...> +toto:~$ spark-submit --packages "com.github.astrolabsoftware:spark3d_2.11:0.1.3" <...> ``` It might not contain the latest features though (see *Building from source*). @@ -69,8 +69,8 @@ result on the screen, plus details of the coverage at First produce a jar of the spark3D library, and then launch a spark-shell by specifying the external dependencies: ```bash -toto:~$ JARS="target/scala-2.11/spark3d_2.11-0.1.1.jar,lib/jhealpix.jar" -toto:~$ PACKAGES="com.github.astrolabsoftware:spark-fits_2.11:0.4.0" +toto:~$ JARS="target/scala-2.11/spark3d_2.11-0.1.3.jar,lib/jhealpix.jar" +toto:~$ PACKAGES="com.github.astrolabsoftware:spark-fits_2.11:0.6.0" toto:~$ spark-shell --jars $JARS --packages $PACKAGES ``` @@ -83,7 +83,7 @@ scala> // etc... Note that if you make a fat jar (that is building with `sbt assembly` and not `sbt package`), you do not need to specify external dependencies as they are already included in the resulting jar: ```bash -toto:~$ FATJARS="target/scala-2.11/spark3D-assembly-0.1.1.jar" +toto:~$ FATJARS="target/scala-2.11/spark3D-assembly-0.1.3.jar" toto:~$ spark-shell --jars $FATJARS ``` diff --git a/docs/02_introduction.md b/docs/02_introduction.md index 347a9ce..99aabe1 100644 --- a/docs/02_introduction.md +++ b/docs/02_introduction.md @@ -59,13 +59,13 @@ val boxes = new BoxEnvelope(p1: Point3D) ## Supported data sources -One of the goal of spark3D is to support as many data source as possible. Currently, we focused our effort on: FITS, CSV, JSON, and TXT. While the first one is widely used in the Astrophysics community, the others are widely used in the industry. +One of the goal of spark3D is to support as many data source as possible. Currently, you can load all Spark DataSource V2! That means CSV, JSON, parquet, Avro, ... In addition, you can load scientific data formats following Spark DataSource API like [FITS](https://github.com/astrolabsoftware/spark-fits), [ROOT](https://github.com/diana-hep/spark-root) (<= 6.11) or [HDF5](https://github.com/LLNL/spark-hdf5)! In this tutorial we will review the steps to simply create RDD from 3D data sets. A 3DRDD is simply a RDD whose elements are 3D objects. Currently, spark3D supports 2 kind of objects: points (`Point3D`) and spheres (`ShellEnvelope`). Note that spheres are a sub-case of shells. ### Loading Point3D -A point is an object with 3 spatial coordinates. In spark3D, you can choose the coordinate system between cartesian `(x, y, z)` and spherical `(r, theta, phi)`. Let's suppose we have a text file (CSV, JSON, or TXT) whose columns are labeled `x`, `y` and `z`, the cartesian coordinates of points: +A point is an object with 3 spatial coordinates. In spark3D, you can choose the coordinate system between cartesian `(x, y, z)` and spherical `(r, theta, phi)`. Let's suppose we have a file which contains data vectors labeled `x`, `y` and `z`, the cartesian coordinates of points: ```scala import com.astrolabsoftware.spark3d.spatial3DRDD.Point3DRDD @@ -73,19 +73,52 @@ import com.astrolabsoftware.spark3d.spatial3DRDD.Point3DRDD // We assume filename contains at least 3 columns whose names are `colnames` // Order of columns in the file does not matter, as they will be re-aranged // according to `colnames`. -val pointRDD = new Point3DRDD(spark: SparkSession, filename: String, colnames: String, isSpherical: Boolean) +val pointRDD = new Point3DRDD(spark: SparkSession, filename: String, colnames: String, isSpherical: Boolean, format: String, options: Map[String, String]) ``` -With FITS data, with data in the HDU #1, you would just do +`format` and `options` control the correct reading of your data. +* `format` is the name of the data source as registered in Spark. For example: `csv`, `json`, `org.dianahep.sparkroot`, ... For Spark built-in see [here](https://github.com/apache/spark/blob/301bff70637983426d76b106b7c659c1f28ed7bf/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala#L560). +* `options`: Options to pass to the `DataFrameReader` (see below for examples). + +**CSV / JSON** ```scala -import com.astrolabsoftware.spark3d.spatial3DRDD.Point3DRDD +// Spark datasource / You would replace "csv" by "json" for a json file +val format = "csv" +// Options to pass to the DataFrameReader - optional +val options = Map("header" -> "true") +``` -// We assume hdu#1 of filename contains at least 3 columns whose names are `colnames` -// Order of columns in the file does not matter, as they will be re-aranged -// according to `colnames`. -val hdu = 1 -val pointRDD = new Point3DRDD(spark: SparkSession, filename: String, hdu: Int, colnames: String, isSpherical: Boolean) +**TXT** +```scala +// Spark datasource / use csv for text file with custom separator +val format = "csv" +// Options to pass to the DataFrameReader - optional +val options = Map("header" -> "true", "sep" -> " ") +``` + +**FITS** +```scala +// Spark datasource +val format = "fits" // or "com.astrolabsoftware.sparkfits" +// Options to pass to the DataFrameReader - optional +val options = Map("hdu" -> "1") +``` + +**HDF5** +```scala +// Spark datasource +val format = "hdf5" // or "gov.llnl.spark.hdf" +// Options to pass to the DataFrameReader - optional +val options = Map("dataset" -> "/toto") +``` + +**ROOT (<= 6.11)** +```scala +// Spark datasource +val format = "org.dianahep.sparkroot" +// Options to pass to the DataFrameReader - optional +val options = Map("" -> "") ``` The resulting RDD is a `RDD[Point3D]`. Note that there is no space between columns labels. @@ -93,7 +126,7 @@ The resulting RDD is a `RDD[Point3D]`. Note that there is no space between colum ### Loading Sphere A sphere is defined by its center (3 spatial coordinates) plus a radius. -In spark3D, you can choose the coordinate system of the center between cartesian `(x, y, z)` and spherical `(r, theta, phi)`. Let's suppose we have a text file (CSV, JSON, or TXT) whose columns are labeled `r`, `theta`, `phi`, the spherical coordinates and `radius`: +In spark3D, you can choose the coordinate system of the center between cartesian `(x, y, z)` and spherical `(r, theta, phi)`. Let's suppose we have a file which contains data vectors labeled `r`, `theta`, `phi`, the spherical coordinates and `radius`. Similarly to `Point3DRDD` you would use: ```scala import com.astrolabsoftware.spark3d.spatial3DRDD.SphereRDD @@ -101,77 +134,11 @@ import com.astrolabsoftware.spark3d.spatial3DRDD.SphereRDD // We assume filename contains at least 4 columns whose names are `colnames`. // Order of columns in the file does not matter, as they will be re-aranged // according to `colnames`. -val pointRDD = new SphereRDD(spark: SparkSession, filename: String, colnames: String, isSpherical: Boolean) +val sphereRDD = new SphereRDD(spark: SparkSession, filename: String, colnames: String, isSpherical: Boolean, format: String, options: Map[String, String]) ``` -The resulting RDD is a `RDD[Sphere]`. +The resulting RDD is a `RDD[ShellEnvelope]`. ### Loading Shells and Boxes TBD. - -### Loading data from a different data source - -Since the scientific domain considered here is mostly the Astrophysics domain, -the natural storage or exchange file format is the FITS format. -Therefore we consider as part of the problem, the possibility to allow FITS files -to be directly injected into the HDFS infrastructure, so as to develop a Spark based applications. The usual [cfitsio](https://heasarc.gsfc.nasa.gov/fitsio/fitsio.html) library, as well as the FITS I/O format are not adapted to a distributed file system as HDFS. -Therefore we will have to develop low level Reader/Writer services, -to support direct access to FITS data, without copy nor conversion needs. -To tackle this challenge, we started a new project called -[spark-fits](https://github.com/astrolabsoftware/spark-fits), which provides a -Spark connector for FITS data, and a Scala library for manipulating FITS file. - -We plan to release more in the future (HDF5 and ROOT on the TODO list!), and you are welcome to submit requests for specific data sources! -Alternatively you can define your own routine to read it. Add a new routine in `Loader.scala`: - -```scala -/** - * Your doc is important - */ -def Point3DRDDFromMySource(spark : SparkSession, filename : String, colnames : String, - isSpherical: Boolean, ): RDD[Point3D] { - - // Read the data using your data source - val df = spark.read.option(...) - - // Grab the name of columns - val csplit = colnames.split(",") - - // Select the 3 columns (x, y, z) - // and cast to double in case. - val rawRDD = df.select( - col(csplit(0)).cast("double"), - col(csplit(1)).cast("double"), - col(csplit(2)).cast("double") - ) - // DF to RDD - .rdd - // map to Point3D - .map(x => new Point3D( - x.getDouble(0), x.getDouble(1), x.getDouble(2), isSpherical) - ) - - rawRDD -} - -``` - -and then update `Point3DRDD.scala`: - -```scala -/** - * Your doc is important - */ -def this(spark : SparkSession, filename : String, colnames : String, isSpherical: Boolean, ) { - this(Point3DRDDFromMySource(spark, filename, colnames, isSpherical, ), isSpherical) -} -``` - -and finally load your data using: - -```scala -val myPoints = new Point3DRDD(spark : SparkSession, filename : String, colnames : String, isSpherical: Boolean, ) -``` - -here the example makes use of the DataFrame API, but you can use the RDD API as well to read your data. diff --git a/docs/03_partitioning.md b/docs/03_partitioning.md index 02dc12e..f2ac562 100644 --- a/docs/03_partitioning.md +++ b/docs/03_partitioning.md @@ -36,12 +36,13 @@ val spark = SparkSession.builder() // Data is in src/test/resources val fn = "astro_obs.fits" -val hdu = 1 val columns = "Z_COSMO,RA,DEC" val spherical = true +val format = "fits" // "com.astrolabsoftware.sparkfits" +val options = Map("hdu" -> "1") // Load the data -val pointRDD = new Point3DRDD(spark, fn, hdu, columns, spherical) +val pointRDD = new Point3DRDD(spark, fn, columns, spherical, format, options) // nPart is the wanted number of partitions. Default is pointRDD partition number. val pointRDD_partitioned = pointRDD.spatialPartitioning(GridType.LINEARONIONGRID, nPart) @@ -53,10 +54,10 @@ val pointRDD_partitioned = pointRDD.spatialPartitioning(GridType.LINEARONIONGRID ### Octree Partitioning -In the following example, we load `Point3D` data, and we re-partition it with the octree partitioning +In the following example, we load `ShellEnvelope` data (spheres), and we re-partition it with the octree partitioning ```scala -import com.astrolabsoftware.spark3d.spatial3DRDD.Point3DRDD +import com.astrolabsoftware.spark3d.spatial3DRDD.SphereRDD import com.astrolabsoftware.spark3d.utils.GridType import org.apache.spark.sql.SparkSession @@ -67,12 +68,13 @@ val spark = SparkSession.builder() // Data is in src/test/resources val fn = "cartesian_spheres.fits" -val hdu = 1 val columns = "x,y,z,radius" val spherical = false +val format = "fits" // com.astrolabsoftware.sparkfits +val options = Map("hdu" -> "1") // Load the data -val sphereRDD = new Point3DRDD(spark, fn, hdu, columns, spherical) +val sphereRDD = new SphereRDD(spark, fn, columns, spherical, format, options) // nPart is the wanted number of partitions (floored to a power of 8). // Default is sphereRDD partition number. diff --git a/docs/04_query.md b/docs/04_query.md index 59b04e0..441f2bc 100644 --- a/docs/04_query.md +++ b/docs/04_query.md @@ -26,12 +26,13 @@ val spark = SparkSession.builder() // Data files are in src/test/resources val fn = "astro_obs.fits" -val hdu = 1 val columns = "Z_COSMO,RA,DEC" val spherical = true +val format = "fits" // "com.astrolabsoftware.sparkfits" +val options = Map("hdu" -> "1") // Load a RDD[Point3D] from the FITS file -val objects = new Point3DRDD(spark, fn, hdu, columns, spherical) +val objects = new Point3DRDD(spark, fn, columns, spherical, format, options) // Define our envelope, here a sphere. val center = new Point3D(0.9, 0.0, 0.0, spherical) @@ -64,13 +65,14 @@ val spark = SparkSession.builder() // Data files are in src/test/resources val fnA = "astro_obs.fits" val fnB = "astro_obs2.fits" -val hdu = 1 val columns = "Z_COSMO,RA,DEC" val spherical = true +val format = "fits" // "com.astrolabsoftware.sparkfits" +val options = Map("hdu" -> "1") // Load the two data sets -val setA = new Point3DRDD(spark, fnA, hdu, columns, spherical) -val setB = new Point3DRDD(spark, fnB, hdu, columns, spherical) +val setA = new Point3DRDD(spark, fnA, hdu, columns, spherical, format, options) +val setB = new Point3DRDD(spark, fnB, hdu, columns, spherical, format, options) ``` By default, the two sets are partitioned randomly (in the sense points spatially close are probably not in the same partition). diff --git a/src/main/scala/com/spark3d/examples/CrossMatch.scala b/src/main/scala/com/spark3d/examples/CrossMatch.scala index 3fd462a..c32ca1e 100644 --- a/src/main/scala/com/spark3d/examples/CrossMatch.scala +++ b/src/main/scala/com/spark3d/examples/CrossMatch.scala @@ -76,7 +76,7 @@ object CrossMatch { val fnB_fits = args(1).toString // HDU index - val hdu = args(2).toInt + val hdu = args(2).toString // Columns to load val columns = args(3).toString @@ -91,8 +91,9 @@ object CrossMatch { val kind = args(6).toString // Load the data as Point3DRDD - val pointRDDA = new Point3DRDD(spark, fnA_fits, hdu, columns, true) - val pointRDDB = new Point3DRDD(spark, fnB_fits, hdu, columns, true) + val options = Map("hdu" -> hdu) + val pointRDDA = new Point3DRDD(spark, fnA_fits, columns, true, "fits", options) + val pointRDDB = new Point3DRDD(spark, fnB_fits, columns, true, "fits", options) // Re-partition the space and cache the result val pointRDD_partA = pointRDDA.spatialPartitioning( diff --git a/src/main/scala/com/spark3d/examples/OnionSpace.scala b/src/main/scala/com/spark3d/examples/OnionSpace.scala index 03ea1e5..a16cf13 100644 --- a/src/main/scala/com/spark3d/examples/OnionSpace.scala +++ b/src/main/scala/com/spark3d/examples/OnionSpace.scala @@ -67,8 +67,9 @@ object OnionSpace { val fn_fits = args(0).toString // Load the data as Point3DRDD + val options = Map("hdu" -> args(1).toString) val pointRDD = new Point3DRDD( - spark, fn_fits, args(1).toInt, args(2).toString, true) + spark, fn_fits, args(2).toString, true, "fits", options) // Count the number of partition before, and number of elements per partition val partitionsBefore = pointRDD.rawRDD.mapPartitions( diff --git a/src/main/scala/com/spark3d/spatial3DRDD/Loader.scala b/src/main/scala/com/spark3d/spatial3DRDD/Loader.scala index 23023ae..7a71835 100644 --- a/src/main/scala/com/spark3d/spatial3DRDD/Loader.scala +++ b/src/main/scala/com/spark3d/spatial3DRDD/Loader.scala @@ -23,63 +23,63 @@ import org.apache.spark.rdd.RDD /** * Put here routine to load data for a specific data format - * Currently available: CSV, JSON, TXT, FITS + * Currently available: all Spark DataSource V2 compatible format! + * i.e. CSV, JSON, TXT, Avro, Parquet, FITS, HDF5, ROOT (<= 6.11), ... */ object Loader { /** - * Construct a RDD[Point3D] from CSV, JSON or TXT data. + * Construct a RDD[Point3D] from whatever data source registered in Spark. + * For more information about available official connectors: + * `https://spark-packages.org/?q=tags%3A%22Data%20Sources%22` + * + * This includes: CSV, JSON, TXT, Avro, parquet, FITS, ROOT, HDF5, ... + * * {{{ - * // CSV - * val fn = "src/test/resources/astro_obs.csv" - * val rdd = new Point3DRDD(spark, fn, "Z_COSMO,RA,Dec", true) - * // JSON - * val fn = "src/test/resources/astro_obs.json" - * val rdd = new Point3DRDD(spark, fn, "Z_COSMO,RA,Dec", true) - * // TXT - * val fn = "src/test/resources/astro_obs.txt" - * val rdd = new Point3DRDD(spark, fn, "Z_COSMO,RA,Dec", true) + * // Here is an example with a CSV file containing + * // 3 spherical coordinates columns labeled Z_COSMO,RA,Dec. + * + * // Filename + * val fn = "path/to/file.csv" + * // Spark datasource + * val format = "csv" + * // Options to pass to the DataFrameReader - optional + * val options = Map("header" -> "true") + * + * // Load the data as RDD[Point3D] + * val rdd = new Point3DRDD(spark, fn, "Z_COSMO,RA,Dec", true, format, options) * }}} * * @param spark : (SparkSession) * The spark session * @param filename : (String) - * File name where the data is stored. Extension must be explicitly - * written (.cvs, .json, or .txt) + * File name where the data is stored. * @param colnames : (String) * Comma-separated names of (x, y, z) columns. Example: "Z_COSMO,RA,Dec". * @param isSpherical : (Boolean) * If true, it assumes that the coordinates of the Point3D are (r, theta, phi). * Otherwise, it assumes cartesian coordinates (x, y, z). + * @param format : (String) + * The name of the data source as registered in Spark. For example: + * - text + * - csv + * - json + * - com.astrolabsoftware.sparkfits + * - org.dianahep.sparkroot + * - gov.llnl.spark.hdf or hdf5 + * @param options : (Map[String, String]) + * Options to pass to the DataFrameReader. Default is no options. * @return (RDD[Point3D]) * * */ - def Point3DRDDFromText(spark : SparkSession, filename : String, colnames : String, isSpherical: Boolean): RDD[Point3D] = { + def Point3DRDDFromV2( + spark : SparkSession, filename : String, + colnames : String, isSpherical: Boolean, format: String, + options: Map[String, String] = Map("" -> "")): RDD[Point3D] = { - val df = filename match { - case x if x.contains(".csv") => { - spark.read - .option("header", true) - .csv(filename) - } - case x if x.contains(".json") => { - spark.read - .option("header", true) - .json(filename) - } - case x if x.contains(".txt") => { - spark.read - .option("header", true) - .option("sep", " ") - .csv(filename) - } - case _ => throw new AssertionError(""" - I do not understand the file format. Accepted extensions are: - .csv, .json, .txt, or .text - You can also load FITS file using the HDU option (see Point3DRDDFromFITS) - """) - } + // Generic load for v2 datasource + val df = spark.read.format(format).options(options).load(filename) // Grab the name of columns val csplit = colnames.split(",") @@ -102,67 +102,25 @@ object Loader { } /** - * Construct a RDD[Point3D] from FITS data. - * {{{ - * val fn = "src/test/resources/astro_obs.fits" - * val rdd = new Point3DRDDFromFITS(spark, fn, 1, "Z_COSMO,RA,Dec", true) - * }}} + * Construct a RDD[ShellEnvelope] from whatever data source registered in Spark. + * For more information about available official connectors: + * `https://spark-packages.org/?q=tags%3A%22Data%20Sources%22` * - * @param spark : (SparkSession) - * The spark session - * @param filename : (String) - * File name where the data is stored - * @param hdu : (Int) - * HDU to load. - * @param colnames : (String) - * Comma-separated names of (x, y, z) columns. Example: "Z_COSMO,RA,Dec". - * @param isSpherical : (Boolean) - * If true, it assumes that the coordinates of the Point3D are (r, theta, phi). - * Otherwise, it assumes cartesian coordinates (x, y, z). Default is false. - * @return (RDD[Point3D]) + * This includes: CSV, JSON, TXT, Avro, parquet, FITS, ROOT, HDF5, ... * - */ - def Point3DRDDFromFITS(spark : SparkSession, filename : String, hdu : Int, - colnames : String, isSpherical: Boolean): RDD[Point3D] = { - - // Load the data as DataFrame using spark-fits - val df = spark.read - .format("com.sparkfits") - .option("hdu", hdu) - .load(filename) - - // Grab the name of columns - val csplit = colnames.split(",") - - // Select the 3 columns (x, y, z) - // and cast to double in case. - val rawRDD = df.select( - col(csplit(0)).cast("double"), - col(csplit(1)).cast("double"), - col(csplit(2)).cast("double") - ) - // DF to RDD - .rdd - // map to Point3D - .map(x => new Point3D( - x.getDouble(0), x.getDouble(1), x.getDouble(2), isSpherical) - ) - - rawRDD - } - - /** - * Construct a RDD[ShellEnvelope] from CSV, JSON or TXT data. * {{{ - * // CSV - * val fn = "src/test/resources/cartesian_spheres.csv" - * val rdd = new SphereRDD(spark, fn, "x,y,z,radius", false) - * // JSON - * val fn = "src/test/resources/cartesian_spheres.json" - * val rdd = new SphereRDD(spark, fn, "x,y,z,radius", false) - * // TXT - * val fn = "src/test/resources/cartesian_spheres.txt" - * val rdd = new SphereRDD(spark, fn, "x,y,z,radius", false) + * // Here is an example with a CSV file containing + * // 3 cartesian coordinates + 1 radius columns labeled x,y,z,radius. + * + * // Filename + * val fn = "path/to/file.csv" + * // Spark datasource + * val format = "csv" + * // Options to pass to the DataFrameReader - optional + * val options = Map("header" -> "true") + * + * // Load the data as RDD[ShellEnvelope] + * val rdd = new SphereRDD(spark, fn, "x,y,z,radius", true, format, options) * }}} * * @param spark : (SparkSession) @@ -177,95 +135,31 @@ object Loader { * If true, it assumes that the coordinates of the center of * the ShellEnvelope are (r, theta, phi). * Otherwise, it assumes cartesian coordinates (x, y, z). Default is false. + * @param format : (String) + * The name of the data source as registered in Spark. For example: + * - text + * - csv + * - json + * - com.astrolabsoftware.sparkfits + * - org.dianahep.sparkroot + * - gov.llnl.spark.hdf or hdf5 + * @param options : (Map[String, String]) + * Options to pass to the DataFrameReader. Default is no options. * @return (RDD[ShellEnvelope]) * */ - def SphereRDDFromText( - spark : SparkSession, filename : String, colnames : String, - isSpherical: Boolean = false): RDD[ShellEnvelope] = { + def SphereRDDFromV2( + spark : SparkSession, filename : String, + colnames : String, isSpherical: Boolean, format: String, + options: Map[String, String] = Map("" -> "")): RDD[ShellEnvelope] = { - val df = filename match { - case x if x.contains(".csv") => { - spark.read - .option("header", true) - .csv(filename) - } - case x if x.contains(".json") => { - spark.read - .option("header", true) - .json(filename) - } - case x if x.contains(".txt") => { - spark.read - .option("header", true) - .option("sep", " ") - .csv(filename) - } - case _ => throw new AssertionError(""" - I do not understand the file format. Accepted extensions are: - .csv, .json, .txt, or .text - You can also load FITS file using the HDU option (see Point3DRDDFromFITS) - """) - } + // Generic load for v2 datasource + val df = spark.read.format(format).options(options).load(filename) // Grab the name of columns val csplit = colnames.split(",") - // Select the 3 columns (x, y, z) - // and cast to double in case. - val rawRDD = df.select( - col(csplit(0)).cast("double"), - col(csplit(1)).cast("double"), - col(csplit(2)).cast("double"), - col(csplit(3)).cast("double") - ) - // DF to RDD - .rdd - // map to ShellEnvelope - .map(x => new ShellEnvelope( - x.getDouble(0), x.getDouble(1), x.getDouble(2), isSpherical, x.getDouble(3)) - ) - - rawRDD - } - - /** - * Construct a RDD[ShellEnvelope] from FITS data. - * {{{ - * val fn = "src/test/resources/cartesian_spheres.fits" - * val sphereRDD = new SphereRDD(spark, fn, 1, "x,y,z,radius", false) - * }}} - * - * @param spark : (SparkSession) - * The spark session - * @param filename : (String) - * File name where the data is stored - * @param hdu : (Int) - * HDU to load. - * @param colnames : (String) - * Comma-separated names of (x, y, z, r) columns to read. - * Example: "Z_COSMO,RA,Dec,Radius". - * @param isSpherical : (Boolean) - * If true, it assumes that the coordinates of the center of - * the ShellEnvelope are (r, theta, phi). - * Otherwise, it assumes cartesian coordinates (x, y, z). Default is false. - * @return (RDD[ShellEnvelope) - * - */ - def SphereRDDFromFITS( - spark : SparkSession, filename : String, hdu : Int, - colnames : String, isSpherical: Boolean = false): RDD[ShellEnvelope] = { - - // Load the data as DataFrame using spark-fits - val df = spark.read - .format("com.sparkfits") - .option("hdu", hdu) - .load(filename) - - // Grab the name of columns - val csplit = colnames.split(",") - - // Select the 3 columns (x, y, z) + // Select the 3 columns (x, y, z) + radius // and cast to double in case. val rawRDD = df.select( col(csplit(0)).cast("double"), diff --git a/src/main/scala/com/spark3d/spatial3DRDD/Point3DRDD.scala b/src/main/scala/com/spark3d/spatial3DRDD/Point3DRDD.scala index dab7db2..0a3099b 100644 --- a/src/main/scala/com/spark3d/spatial3DRDD/Point3DRDD.scala +++ b/src/main/scala/com/spark3d/spatial3DRDD/Point3DRDD.scala @@ -26,59 +26,53 @@ import org.apache.spark.rdd.RDD class Point3DRDD(rdd : RDD[Point3D], override val isSpherical: Boolean) extends Shape3DRDD[Point3D] { /** - * Construct a Point3DRDD from CSV, JSON or TXT data. + * Construct a RDD[Point3D] from whatever data source registered in Spark. + * For more information about available official connectors: + * `https://spark-packages.org/?q=tags%3A%22Data%20Sources%22` + * + * This includes: CSV, JSON, TXT, FITS, ROOT, HDF5, ... + * * {{{ - * // CSV - * val fn = "src/test/resources/astro_obs.csv" - * val rdd = new Point3DRDD(spark, fn, "Z_COSMO,RA,Dec", true) - * // JSON - * val fn = "src/test/resources/astro_obs.json" - * val rdd = new Point3DRDD(spark, fn, "Z_COSMO,RA,Dec", true) - * // TXT - * val fn = "src/test/resources/astro_obs.txt" - * val rdd = new Point3DRDD(spark, fn, "Z_COSMO,RA,Dec", true) + * // Here is an example with a CSV file containing + * // 3 spherical coordinates columns labeled Z_COSMO,RA,Dec. + * + * // Filename + * val fn = "path/to/file.csv" + * // Spark datasource + * val format = "csv" + * // Options to pass to the DataFrameReader - optional + * val options = Map("header" -> "true") + * + * // Load the data as RDD[Point3D] + * val rdd = new Point3DRDD(spark, fn, "Z_COSMO,RA,Dec", true, format, options) * }}} * * @param spark : (SparkSession) * The spark session * @param filename : (String) - * File name where the data is stored. Extension must be explicitly - * written (.cvs, .json, .txt, or .text) + * File name where the data is stored. * @param colnames : (String) - * Comma-separated names of (x, y, z) columns. Example: "Z_COSMO,RA,Dec". + * Comma-separated names of (x, y, z) columns. Example: "Z_COSMO,RA,Dec". * @param isSpherical : (Boolean) * If true, it assumes that the coordinates of the Point3D are (r, theta, phi). * Otherwise, it assumes cartesian coordinates (x, y, z). + * @param format : (String) + * The name of the data source as registered in Spark. For example: + * - text + * - csv + * - json + * - com.astrolabsoftware.sparkfits + * - org.dianahep.sparkroot + * - gov.llnl.spark.hdf or hdf5 + * @param options : (Map[String, String]) + * Options to pass to the DataFrameReader. Default is no options. * @return (RDD[Point3D]) * * */ - def this(spark : SparkSession, filename : String, colnames : String, isSpherical: Boolean) { - this(Point3DRDDFromText(spark, filename, colnames, isSpherical), isSpherical) - } - - /** - * Class to make a Point3D RDD from FITS data. - * {{{ - * val fn = "src/test/resources/astro_obs.fits" - * val p3DRDD = new Point3DRDD(spark, fn, 1, "Z_COSMO,RA,Dec", true) - * }}} - * - * @param spark : (SparkSession) - * The spark session - * @param filename : (String) - * File name where the data is stored - * @param hdu : (Int) - * HDU to load. - * @param colnames : (String) - * Comma-separated names of columns. Example: "Z_COSMO,RA,Dec". - * @param isSpherical : (Boolean) - * If true, it assumes that the coordinates of the Point3D are (r, theta, phi). - * Otherwise, it assumes cartesian coordinates (x, y, z). Default is false. - * - */ - def this(spark : SparkSession, filename : String, hdu : Int, colnames : String, isSpherical: Boolean) { - this(Point3DRDDFromFITS(spark, filename, hdu, colnames, isSpherical), isSpherical) + def this(spark : SparkSession, filename : String, colnames : String, isSpherical: Boolean, + format: String, options: Map[String, String] = Map("" -> "")) { + this(Point3DRDDFromV2(spark, filename, colnames, isSpherical, format, options), isSpherical) } // Raw partitioned RDD diff --git a/src/main/scala/com/spark3d/spatial3DRDD/SphereRDD.scala b/src/main/scala/com/spark3d/spatial3DRDD/SphereRDD.scala index 93b6346..64b4cb9 100644 --- a/src/main/scala/com/spark3d/spatial3DRDD/SphereRDD.scala +++ b/src/main/scala/com/spark3d/spatial3DRDD/SphereRDD.scala @@ -23,20 +23,24 @@ import org.apache.spark.sql.functions.col import org.apache.spark.rdd.RDD -class SphereRDD(rdd : RDD[ShellEnvelope], override val isSpherical: Boolean) extends Shape3DRDD[ShellEnvelope] { +class SphereRDD(rdd : RDD[ShellEnvelope], + override val isSpherical: Boolean) extends Shape3DRDD[ShellEnvelope] { /** * Construct a RDD[ShellEnvelope] from CSV, JSON or TXT data. * {{{ - * // CSV - * val fn = "src/test/resources/cartesian_spheres.csv" - * val rdd = new SphereRDD(spark, fn, "x,y,z,radius", false) - * // JSON - * val fn = "src/test/resources/cartesian_spheres.json" - * val rdd = new SphereRDD(spark, fn, "x,y,z,radius", false) - * // TXT - * val fn = "src/test/resources/cartesian_spheres.txt" - * val rdd = new SphereRDD(spark, fn, "x,y,z,radius", false) + * // Here is an example with a CSV file containing + * // 3 cartesian coordinates + 1 radius columns labeled x,y,z,radius. + * + * // Filename + * val fn = "path/to/file.csv" + * // Spark datasource + * val format = "csv" + * // Options to pass to the DataFrameReader - optional + * val options = Map("header" -> "true") + * + * // Load the data as RDD[ShellEnvelope] + * val rdd = new SphereRDD(spark, fn, "x,y,z,radius", true, format, options) * }}} * * @param spark : (SparkSession) @@ -51,38 +55,25 @@ class SphereRDD(rdd : RDD[ShellEnvelope], override val isSpherical: Boolean) ext * If true, it assumes that the coordinates of the center of * the ShellEnvelope are (r, theta, phi). * Otherwise, it assumes cartesian coordinates (x, y, z). Default is false. + * @param format : (String) + * The name of the data source as registered in Spark. For example: + * - text + * - csv + * - json + * - com.astrolabsoftware.sparkfits + * - org.dianahep.sparkroot + * - gov.llnl.spark.hdf or hdf5 + * @param options : (Map[String, String]) + * Options to pass to the DataFrameReader. Default is no options. * @return (RDD[ShellEnvelope]) * */ - def this(spark : SparkSession, filename : String, colnames : String, isSpherical: Boolean) { - this(SphereRDDFromText(spark, filename, colnames, isSpherical), isSpherical) - } - - /** - * Construct a RDD[ShellEnvelope] from FITS data. - * {{{ - * val fn = "src/test/resources/cartesian_spheres.fits" - * val sphereRDD = new SphereRDD(spark, fn, 1, "x,y,z,radius", false) - * }}} - * - * @param spark : (SparkSession) - * The spark session - * @param filename : (String) - * File name where the data is stored - * @param hdu : (Int) - * HDU to load. - * @param colnames : (String) - * Comma-separated names of (x, y, z, r) columns to read. - * Example: "Z_COSMO,RA,Dec,Radius". - * @param isSpherical : (Boolean) - * If true, it assumes that the coordinates of the center of - * the ShellEnvelope are (r, theta, phi). - * Otherwise, it assumes cartesian coordinates (x, y, z). Default is false. - * @return (RDD[ShellEnvelope) - * - */ - def this(spark : SparkSession, filename : String, hdu : Int, colnames : String, isSpherical: Boolean) { - this(SphereRDDFromFITS(spark, filename, hdu, colnames, isSpherical), isSpherical) + def this(spark : SparkSession, filename : String, + colnames : String, isSpherical: Boolean, + format: String, options: Map[String, String] = Map("" -> "")) { + this(SphereRDDFromV2(spark, filename, colnames, isSpherical, format, options), + isSpherical + ) } // Raw partitioned RDD @@ -95,7 +86,8 @@ class SphereRDD(rdd : RDD[ShellEnvelope], override val isSpherical: Boolean) ext * @param rdd : (RDD[ShellEnvelope]) * RDD whose elements are ShellEnvelope instances. * @param isSpherical : (Boolean) - * If true, it assumes that the coordinates of the ShellEnvelope center are (r, theta, phi). + * If true, it assumes that the coordinates of the ShellEnvelope + * center are (r, theta, phi). * Otherwise, it assumes cartesian coordinates (x, y, z). * */ diff --git a/src/test/scala/com/spark3d/geometryObjects/ShellEnvelopeTest.scala b/src/test/scala/com/spark3d/geometryObjects/ShellEnvelopeTest.scala index f028645..fb8c617 100644 --- a/src/test/scala/com/spark3d/geometryObjects/ShellEnvelopeTest.scala +++ b/src/test/scala/com/spark3d/geometryObjects/ShellEnvelopeTest.scala @@ -201,6 +201,21 @@ class ShellEnvelopeTest extends FunSuite with BeforeAndAfterAll { assert(env.outerRadius == 12.6) } + test("Can you catch an error when intersecting a shell with unknown shape?") { + val env = new ShellEnvelope(valid_env) + assert(!null_env.intersects(env)) + + env.innerRadius = 2.0 + env.outerRadius = 4.0 + assert(!valid_env.intersects(env)) + + val unknown = new nonShape + val exception = intercept[AssertionError] { + env.intersects(unknown) + } + assert(exception.getMessage.contains("Cannot perform intersection because the type of shape is unknown!")) + } + test("Can you check if the two shell Envelopes intersect each other?") { val env = new ShellEnvelope(valid_env) assert(!null_env.intersects(env)) diff --git a/src/test/scala/com/spark3d/spatial3DRDD/LoaderTest.scala b/src/test/scala/com/spark3d/spatial3DRDD/LoaderTest.scala index a6bba0f..9ab679a 100644 --- a/src/test/scala/com/spark3d/spatial3DRDD/LoaderTest.scala +++ b/src/test/scala/com/spark3d/spatial3DRDD/LoaderTest.scala @@ -73,64 +73,58 @@ class LoaderTest extends FunSuite with BeforeAndAfterAll { val fns_wrong = "src/test/resources/cartesian_spheres.wrong" test("FITS: can you read points?") { - val pointRDD = new Point3DRDD(spark, fn_fits, 1, "Z_COSMO,RA,DEC", true) + val options = Map("hdu" -> "1") + val pointRDD = new Point3DRDD(spark, fn_fits, "Z_COSMO,RA,DEC", true, "fits", options) assert(pointRDD.isInstanceOf[Point3DRDD] && pointRDD.rawRDD.count() == 20000) } test("FITS: can you read spheres?") { - val sRDD = new SphereRDD(spark, fns_fits, 1, "x,y,z,radius", false) + val options = Map("hdu" -> "1") + val sRDD = new SphereRDD(spark, fns_fits, "x,y,z,radius", false, "fits", options) assert(sRDD.isInstanceOf[SphereRDD] && sRDD.rawRDD.count() == 20000) } test("CSV: can you read points?") { - val pointRDD = new Point3DRDD(spark, fn_csv, "Z_COSMO,RA,DEC", true) + val options = Map("header" -> "true") + val pointRDD = new Point3DRDD(spark, fn_csv, "Z_COSMO,RA,DEC", true, "csv", options) assert(pointRDD.isInstanceOf[Point3DRDD] && pointRDD.rawRDD.count() == 20000) } test("CSV: can you read spheres?") { - val sRDD = new SphereRDD(spark, fns_csv, "x,y,z,radius", false) + val options = Map("header" -> "true") + val sRDD = new SphereRDD(spark, fns_csv, "x,y,z,radius", false, "csv", options) assert(sRDD.isInstanceOf[SphereRDD] && sRDD.rawRDD.count() == 20000) } test("JSON: can you read points?") { - val pointRDD = new Point3DRDD(spark, fn_json, "Z_COSMO,RA,DEC", true) + val options = Map("header" -> "true") + val pointRDD = new Point3DRDD(spark, fn_json, "Z_COSMO,RA,DEC", true, "json", options) assert(pointRDD.isInstanceOf[Point3DRDD] && pointRDD.rawRDD.count() == 20000) } test("JSON: can you read spheres?") { - val sRDD = new SphereRDD(spark, fns_json, "x,y,z,radius", false) + val options = Map("header" -> "true") + val sRDD = new SphereRDD(spark, fns_json, "x,y,z,radius", false, "json", options) assert(sRDD.isInstanceOf[SphereRDD] && sRDD.rawRDD.count() == 20000) } test("TXT: can you read points?") { - val pointRDD = new Point3DRDD(spark, fn_txt, "Z_COSMO,RA,DEC", true) + val options = Map("sep" -> " ", "header" -> "true") + val pointRDD = new Point3DRDD(spark, fn_txt, "Z_COSMO,RA,DEC", true, "csv", options) assert(pointRDD.isInstanceOf[Point3DRDD] && pointRDD.rawRDD.count() == 20000) } test("TXT: can you read spheres?") { - val sRDD = new SphereRDD(spark, fns_txt, "x,y,z,radius", false) + val options = Map("sep" -> " ", "header" -> "true") + val sRDD = new SphereRDD(spark, fns_txt, "x,y,z,radius", false, "csv", options) assert(sRDD.isInstanceOf[SphereRDD] && sRDD.rawRDD.count() == 20000) } - - test("UNKNOWN: can you catch a file extension error (points)?") { - val exception = intercept[AssertionError] { - val pointRDD = new Point3DRDD(spark, fn_wrong, "Z_COSMO,RA,DEC", true) - } - assert(exception.getMessage.contains("I do not understand the file format")) - } - - test("UNKNOWN: can you catch a file extension error (spheres)?") { - val exception = intercept[AssertionError] { - val pointRDD = new SphereRDD(spark, fn_wrong, "x,y,z,radius", false) - } - assert(exception.getMessage.contains("I do not understand the file format")) - } } diff --git a/src/test/scala/com/spark3d/spatial3DRDD/Point3DRDDTest.scala b/src/test/scala/com/spark3d/spatial3DRDD/Point3DRDDTest.scala index 65c3809..8377db1 100644 --- a/src/test/scala/com/spark3d/spatial3DRDD/Point3DRDDTest.scala +++ b/src/test/scala/com/spark3d/spatial3DRDD/Point3DRDDTest.scala @@ -65,7 +65,8 @@ class Point3DRDDTest extends FunSuite with BeforeAndAfterAll { val fn_fits = "src/test/resources/astro_obs.fits" test("Can you repartition a RDD with the onion space?") { - val pointRDD = new Point3DRDD(spark, fn_fits, 1, "Z_COSMO,RA,DEC", true) + val options = Map("hdu" -> "1") + val pointRDD = new Point3DRDD(spark, fn_fits, "Z_COSMO,RA,DEC", true, "fits", options) // Partition the space using the LINEARONIONGRID val pointRDD_part = pointRDD.spatialPartitioning(GridType.LINEARONIONGRID) @@ -78,7 +79,8 @@ class Point3DRDDTest extends FunSuite with BeforeAndAfterAll { } test("Can you repartition a RDD with the onion space with more partitions?") { - val pointRDD = new Point3DRDD(spark, fn_fits, 1, "Z_COSMO,RA,DEC", true) + val options = Map("hdu" -> "1") + val pointRDD = new Point3DRDD(spark, fn_fits, "Z_COSMO,RA,DEC", true, "fits", options) // Partition my space with 10 data shells using the LINEARONIONGRID val pointRDD_part = pointRDD.spatialPartitioning(GridType.LINEARONIONGRID, 10) @@ -91,7 +93,8 @@ class Point3DRDDTest extends FunSuite with BeforeAndAfterAll { } test("RDD: Can you construct a Point3DRDD from a RDD[Point3D]?") { - val pointRDD = new Point3DRDD(spark, fn_fits, 1, "Z_COSMO,RA,DEC", true) + val options = Map("hdu" -> "1") + val pointRDD = new Point3DRDD(spark, fn_fits, "Z_COSMO,RA,DEC", true, "fits", options) val rdd = pointRDD.rawRDD @@ -101,8 +104,9 @@ class Point3DRDDTest extends FunSuite with BeforeAndAfterAll { } test("Can you repartition a RDD from the partitioner of another?") { - val pointRDD1 = new Point3DRDD(spark, fn_fits, 1, "Z_COSMO,RA,DEC", true) - val pointRDD2 = new Point3DRDD(spark, fn_fits, 1, "Z_COSMO,RA,DEC", true) + val options = Map("hdu" -> "1") + val pointRDD1 = new Point3DRDD(spark, fn_fits, "Z_COSMO,RA,DEC", true, "fits", options) + val pointRDD2 = new Point3DRDD(spark, fn_fits, "Z_COSMO,RA,DEC", true, "fits", options) // Partition 1st RDD with 10 data shells using the LINEARONIONGRID val pointRDD1_part = pointRDD1.spatialPartitioning(GridType.LINEARONIONGRID, 10) diff --git a/src/test/scala/com/spark3d/spatial3DRDD/SphereRDDTest.scala b/src/test/scala/com/spark3d/spatial3DRDD/SphereRDDTest.scala index 7347bcc..e8bfffb 100644 --- a/src/test/scala/com/spark3d/spatial3DRDD/SphereRDDTest.scala +++ b/src/test/scala/com/spark3d/spatial3DRDD/SphereRDDTest.scala @@ -57,7 +57,8 @@ class SphereRDDTest extends FunSuite with BeforeAndAfterAll { val fn_csv_manual = "src/test/resources/cartesian_spheres_manual.csv" test("FITS: Can you repartition a RDD with the octree space?") { - val sphereRDD = new SphereRDD(spark, fn_fits, 1, "x,y,z,radius", false) + val options = Map("hdu" -> "1") + val sphereRDD = new SphereRDD(spark, fn_fits, "x,y,z,radius", false, "fits", options) // Partition the space using the OCTREE val sphereRDD_part = sphereRDD.spatialPartitioning(GridType.OCTREE, 100) @@ -68,7 +69,8 @@ class SphereRDDTest extends FunSuite with BeforeAndAfterAll { } test("CSV: Can you repartition a RDD with the octree space?") { - val sphereRDD = new SphereRDD(spark, fn_csv_manual,"x,y,z,radius", false) + val options = Map("header" -> "true") + val sphereRDD = new SphereRDD(spark, fn_csv_manual,"x,y,z,radius", false, "csv", options) // check the data boundary val dataBoundary = BoxEnvelope.apply(0.0, 4.0, 0.0, 4.0, 0.0, 4.0) @@ -122,7 +124,8 @@ class SphereRDDTest extends FunSuite with BeforeAndAfterAll { } test("RDD: Can you construct a SphereRDD from a RDD[Shell]?") { - val pointRDD = new SphereRDD(spark, fn_csv, "x,y,z,radius", false) + val options = Map("header" -> "true") + val pointRDD = new SphereRDD(spark, fn_csv, "x,y,z,radius", false, "csv", options) val newRDD = new SphereRDD(pointRDD.rawRDD, pointRDD.isSpherical) diff --git a/src/test/scala/com/spark3d/spatialOperator/CenterCrossMatchTest.scala b/src/test/scala/com/spark3d/spatialOperator/CenterCrossMatchTest.scala index 41d0b07..bc20e65 100644 --- a/src/test/scala/com/spark3d/spatialOperator/CenterCrossMatchTest.scala +++ b/src/test/scala/com/spark3d/spatialOperator/CenterCrossMatchTest.scala @@ -68,8 +68,9 @@ class CenterCrossMatchTest extends FunSuite with BeforeAndAfterAll { test("Can you cross match A and B centers and return A?") { - val pointRDDA = new Point3DRDD(spark, fnA, 1, "Z_COSMO,RA,DEC", true) - val pointRDDB = new Point3DRDD(spark, fnB, 1, "Z_COSMO,RA,DEC", true) + val options = Map("hdu" -> "1") + val pointRDDA = new Point3DRDD(spark, fnA, "Z_COSMO,RA,DEC", true, "fits", options) + val pointRDDB = new Point3DRDD(spark, fnB, "Z_COSMO,RA,DEC", true, "fits", options) // Partition 1st RDD with 100 data shells using the LINEARONIONGRID val pointRDDA_part = pointRDDA.spatialPartitioning(GridType.LINEARONIONGRID, 100) @@ -85,8 +86,9 @@ class CenterCrossMatchTest extends FunSuite with BeforeAndAfterAll { test("Can you cross match A and B centers and return B?") { - val pointRDDA = new Point3DRDD(spark, fnA, 1, "Z_COSMO,RA,DEC", true) - val pointRDDB = new Point3DRDD(spark, fnB, 1, "Z_COSMO,RA,DEC", true) + val options = Map("hdu" -> "1") + val pointRDDA = new Point3DRDD(spark, fnA, "Z_COSMO,RA,DEC", true, "fits", options) + val pointRDDB = new Point3DRDD(spark, fnB, "Z_COSMO,RA,DEC", true, "fits", options) // Partition 1st RDD with 100 data shells using the LINEARONIONGRID val pointRDDA_part = pointRDDA.spatialPartitioning(GridType.LINEARONIONGRID, 100) @@ -102,8 +104,9 @@ class CenterCrossMatchTest extends FunSuite with BeforeAndAfterAll { test("Can you cross match A and B centers and return (A,B)?") { - val pointRDDA = new Point3DRDD(spark, fnA, 1, "Z_COSMO,RA,DEC", true) - val pointRDDB = new Point3DRDD(spark, fnB, 1, "Z_COSMO,RA,DEC", true) + val options = Map("hdu" -> "1") + val pointRDDA = new Point3DRDD(spark, fnA, "Z_COSMO,RA,DEC", true, "fits", options) + val pointRDDB = new Point3DRDD(spark, fnB, "Z_COSMO,RA,DEC", true, "fits", options) // Partition 1st RDD with 100 data shells using the LINEARONIONGRID val pointRDDA_part = pointRDDA.spatialPartitioning(GridType.LINEARONIONGRID, 100) @@ -119,8 +122,9 @@ class CenterCrossMatchTest extends FunSuite with BeforeAndAfterAll { test("Can you catch an error in center cross match (wrong name)?") { - val pointRDDA = new Point3DRDD(spark, fnA, 1, "Z_COSMO,RA,DEC", true) - val pointRDDB = new Point3DRDD(spark, fnB, 1, "Z_COSMO,RA,DEC", true) + val options = Map("hdu" -> "1") + val pointRDDA = new Point3DRDD(spark, fnA, "Z_COSMO,RA,DEC", true, "fits", options) + val pointRDDB = new Point3DRDD(spark, fnB, "Z_COSMO,RA,DEC", true, "fits", options) // Partition 1st RDD with 100 data shells using the LINEARONIONGRID val pointRDDA_part = pointRDDA.spatialPartitioning(GridType.LINEARONIONGRID, 100) @@ -137,8 +141,9 @@ class CenterCrossMatchTest extends FunSuite with BeforeAndAfterAll { test("Can you catch an error in center cross match (different partitioners)?") { - val pointRDDA = new Point3DRDD(spark, fnA, 1, "Z_COSMO,RA,DEC", true) - val pointRDDB = new Point3DRDD(spark, fnB, 1, "Z_COSMO,RA,DEC", true) + val options = Map("hdu" -> "1") + val pointRDDA = new Point3DRDD(spark, fnA, "Z_COSMO,RA,DEC", true, "fits", options) + val pointRDDB = new Point3DRDD(spark, fnB, "Z_COSMO,RA,DEC", true, "fits", options) // Partition 1st RDD with 100 data shells using the LINEARONIONGRID val pointRDDA_part = pointRDDA.spatialPartitioning(GridType.LINEARONIONGRID, 100) @@ -153,8 +158,9 @@ class CenterCrossMatchTest extends FunSuite with BeforeAndAfterAll { test("Can you catch an error if epsilon is negative?") { - val pointRDDA = new Point3DRDD(spark, fnA, 1, "Z_COSMO,RA,DEC", true) - val pointRDDB = new Point3DRDD(spark, fnB, 1, "Z_COSMO,RA,DEC", true) + val options = Map("hdu" -> "1") + val pointRDDA = new Point3DRDD(spark, fnA, "Z_COSMO,RA,DEC", true, "fits", options) + val pointRDDB = new Point3DRDD(spark, fnB, "Z_COSMO,RA,DEC", true, "fits", options) // Partition 1st RDD with 100 data shells using the LINEARONIONGRID val pointRDDA_part = pointRDDA.spatialPartitioning(GridType.LINEARONIONGRID, 100) diff --git a/src/test/scala/com/spark3d/spatialOperator/PixelCrossMatchTest.scala b/src/test/scala/com/spark3d/spatialOperator/PixelCrossMatchTest.scala index 32cb627..53025dc 100644 --- a/src/test/scala/com/spark3d/spatialOperator/PixelCrossMatchTest.scala +++ b/src/test/scala/com/spark3d/spatialOperator/PixelCrossMatchTest.scala @@ -66,8 +66,9 @@ class PixelCrossMatchTest extends FunSuite with BeforeAndAfterAll { test("Can you cross match A and B and return A?") { - val pointRDDA = new Point3DRDD(spark, fnA, 1, "Z_COSMO,RA,DEC", true) - val pointRDDB = new Point3DRDD(spark, fnB, 1, "Z_COSMO,RA,DEC", true) + val options = Map("hdu" -> "1") + val pointRDDA = new Point3DRDD(spark, fnA, "Z_COSMO,RA,DEC", true, "fits", options) + val pointRDDB = new Point3DRDD(spark, fnB, "Z_COSMO,RA,DEC", true, "fits", options) // Partition 1st RDD with 100 data shells using the LINEARONIONGRID val pointRDDA_part = pointRDDA.spatialPartitioning(GridType.LINEARONIONGRID, 100) @@ -82,8 +83,9 @@ class PixelCrossMatchTest extends FunSuite with BeforeAndAfterAll { test("Can you cross match A and B and return B?") { - val pointRDDA = new Point3DRDD(spark, fnA, 1, "Z_COSMO,RA,DEC", true) - val pointRDDB = new Point3DRDD(spark, fnB, 1, "Z_COSMO,RA,DEC", true) + val options = Map("hdu" -> "1") + val pointRDDA = new Point3DRDD(spark, fnA, "Z_COSMO,RA,DEC", true, "fits", options) + val pointRDDB = new Point3DRDD(spark, fnB, "Z_COSMO,RA,DEC", true, "fits", options) // Partition 1st RDD with 100 data shells using the LINEARONIONGRID val pointRDDA_part = pointRDDA.spatialPartitioning(GridType.LINEARONIONGRID, 100) @@ -98,8 +100,9 @@ class PixelCrossMatchTest extends FunSuite with BeforeAndAfterAll { test("Can you cross match A and B and return (A,B)?") { - val pointRDDA = new Point3DRDD(spark, fnA, 1, "Z_COSMO,RA,DEC", true) - val pointRDDB = new Point3DRDD(spark, fnB, 1, "Z_COSMO,RA,DEC", true) + val options = Map("hdu" -> "1") + val pointRDDA = new Point3DRDD(spark, fnA, "Z_COSMO,RA,DEC", true, "fits", options) + val pointRDDB = new Point3DRDD(spark, fnB, "Z_COSMO,RA,DEC", true, "fits", options) // Partition 1st RDD with 100 data shells using the LINEARONIONGRID val pointRDDA_part = pointRDDA.spatialPartitioning(GridType.LINEARONIONGRID, 100) @@ -114,8 +117,9 @@ class PixelCrossMatchTest extends FunSuite with BeforeAndAfterAll { test("Can you cross match A and B and return healpix indices?") { - val pointRDDA = new Point3DRDD(spark, fnA, 1, "Z_COSMO,RA,DEC", true) - val pointRDDB = new Point3DRDD(spark, fnB, 1, "Z_COSMO,RA,DEC", true) + val options = Map("hdu" -> "1") + val pointRDDA = new Point3DRDD(spark, fnA, "Z_COSMO,RA,DEC", true, "fits", options) + val pointRDDB = new Point3DRDD(spark, fnB, "Z_COSMO,RA,DEC", true, "fits", options) // Partition 1st RDD with 100 data shells using the LINEARONIONGRID val pointRDDA_part = pointRDDA.spatialPartitioning(GridType.LINEARONIONGRID, 100) @@ -130,8 +134,9 @@ class PixelCrossMatchTest extends FunSuite with BeforeAndAfterAll { test("Can you catch an error in pixel cross match (wrong name)?") { - val pointRDDA = new Point3DRDD(spark, fnA, 1, "Z_COSMO,RA,DEC", true) - val pointRDDB = new Point3DRDD(spark, fnB, 1, "Z_COSMO,RA,DEC", true) + val options = Map("hdu" -> "1") + val pointRDDA = new Point3DRDD(spark, fnA, "Z_COSMO,RA,DEC", true, "fits", options) + val pointRDDB = new Point3DRDD(spark, fnB, "Z_COSMO,RA,DEC", true, "fits", options) // Partition 1st RDD with 100 data shells using the LINEARONIONGRID val pointRDDA_part = pointRDDA.spatialPartitioning(GridType.LINEARONIONGRID, 100) @@ -147,8 +152,9 @@ class PixelCrossMatchTest extends FunSuite with BeforeAndAfterAll { test("Can you catch an error in pixel cross match (different partitioners)?") { - val pointRDDA = new Point3DRDD(spark, fnA, 1, "Z_COSMO,RA,DEC", true) - val pointRDDB = new Point3DRDD(spark, fnB, 1, "Z_COSMO,RA,DEC", true) + val options = Map("hdu" -> "1") + val pointRDDA = new Point3DRDD(spark, fnA, "Z_COSMO,RA,DEC", true, "fits", options) + val pointRDDB = new Point3DRDD(spark, fnB, "Z_COSMO,RA,DEC", true, "fits", options) // Partition 1st RDD with 100 data shells using the LINEARONIONGRID val pointRDDA_part = pointRDDA.spatialPartitioning(GridType.LINEARONIONGRID, 100) diff --git a/src/test/scala/com/spark3d/spatialOperator/RangeQueryTest.scala b/src/test/scala/com/spark3d/spatialOperator/RangeQueryTest.scala index 5f95c2b..f98f0c4 100644 --- a/src/test/scala/com/spark3d/spatialOperator/RangeQueryTest.scala +++ b/src/test/scala/com/spark3d/spatialOperator/RangeQueryTest.scala @@ -65,7 +65,8 @@ class RangeQueryTest extends FunSuite with BeforeAndAfterAll { test("Can you find all points within a given region?") { - val pRDD = new Point3DRDD(spark, fn, 1, "Z_COSMO,RA,DEC", true) + val options = Map("hdu" -> "1") + val pRDD = new Point3DRDD(spark, fn, "Z_COSMO,RA,DEC", true, "fits", options) // Window is a Sphere centered on (0.05, 0.05, 0.05) and radius 0.1. val p = new Point3D(0.05, 0.05, 0.05, true) diff --git a/src/test/scala/com/spark3d/spatialOperator/SpatialQueryTest.scala b/src/test/scala/com/spark3d/spatialOperator/SpatialQueryTest.scala index 5461b82..c946569 100644 --- a/src/test/scala/com/spark3d/spatialOperator/SpatialQueryTest.scala +++ b/src/test/scala/com/spark3d/spatialOperator/SpatialQueryTest.scala @@ -55,7 +55,8 @@ class SpatialQueryTest extends FunSuite with BeforeAndAfterAll { val fn_fits = "src/test/resources/cartesian_points.fits" test("Can you find the unique K nearest neighbours?") { - val pointRDD = new Point3DRDD(spark, fn_fits, 1, "x,y,z", false) + val options = Map("hdu" -> "1") + val pointRDD = new Point3DRDD(spark, fn_fits, "x,y,z", false, "fits", options) val queryObject = new Point3D(0.2, 0.2, 0.2, false) // using Octree partitioning val pointRDDPart = pointRDD.spatialPartitioning(GridType.OCTREE, 100) @@ -76,7 +77,8 @@ class SpatialQueryTest extends FunSuite with BeforeAndAfterAll { test("Can you find the K nearest neighbours correctly?") { - val sphereRDD = new SphereRDD(spark, csv_man,"x,y,z,radius", false) + val options = Map("header" -> "true") + val sphereRDD = new SphereRDD(spark, csv_man,"x,y,z,radius", false, "csv", options) val sphereRDD_part = sphereRDD.spatialPartitioning(GridType.OCTREE, 10) val queryObject = new ShellEnvelope(1.0,3.0,3.0,false,0.8)