Skip to content

Commit

Permalink
Merge pull request #69 from astrolabsoftware/datasourceAPI
Browse files Browse the repository at this point in the history
Unify the IO: One constructor to rule them all!
  • Loading branch information
JulienPeloton authored Jul 12, 2018
2 parents 6d1a1b7 + c5b4120 commit 02c8ef6
Show file tree
Hide file tree
Showing 17 changed files with 294 additions and 410 deletions.
8 changes: 4 additions & 4 deletions docs/01_installation.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ another version, feel free to contact us. In addition to Spark, the library has
You can link spark3D to your project (either `spark-shell` or `spark-submit`) by specifying the coordinates:

```bash
toto:~$ spark-submit --packages "com.github.astrolabsoftware:spark3d_2.11:0.1.1" <...>
toto:~$ spark-submit --packages "com.github.astrolabsoftware:spark3d_2.11:0.1.3" <...>
```

It might not contain the latest features though (see *Building from source*).
Expand Down Expand Up @@ -69,8 +69,8 @@ result on the screen, plus details of the coverage at
First produce a jar of the spark3D library, and then launch a spark-shell by specifying the external dependencies:

```bash
toto:~$ JARS="target/scala-2.11/spark3d_2.11-0.1.1.jar,lib/jhealpix.jar"
toto:~$ PACKAGES="com.github.astrolabsoftware:spark-fits_2.11:0.4.0"
toto:~$ JARS="target/scala-2.11/spark3d_2.11-0.1.3.jar,lib/jhealpix.jar"
toto:~$ PACKAGES="com.github.astrolabsoftware:spark-fits_2.11:0.6.0"
toto:~$ spark-shell --jars $JARS --packages $PACKAGES
```

Expand All @@ -83,7 +83,7 @@ scala> // etc...
Note that if you make a fat jar (that is building with `sbt assembly` and not `sbt package`), you do not need to specify external dependencies as they are already included in the resulting jar:

```bash
toto:~$ FATJARS="target/scala-2.11/spark3D-assembly-0.1.1.jar"
toto:~$ FATJARS="target/scala-2.11/spark3D-assembly-0.1.3.jar"
toto:~$ spark-shell --jars $FATJARS
```

Expand Down
125 changes: 46 additions & 79 deletions docs/02_introduction.md
Original file line number Diff line number Diff line change
Expand Up @@ -59,119 +59,86 @@ val boxes = new BoxEnvelope(p1: Point3D)

## Supported data sources

One of the goal of spark3D is to support as many data source as possible. Currently, we focused our effort on: FITS, CSV, JSON, and TXT. While the first one is widely used in the Astrophysics community, the others are widely used in the industry.
One of the goal of spark3D is to support as many data source as possible. Currently, you can load all Spark DataSource V2! That means CSV, JSON, parquet, Avro, ... In addition, you can load scientific data formats following Spark DataSource API like [FITS](https://github.com/astrolabsoftware/spark-fits), [ROOT](https://github.com/diana-hep/spark-root) (<= 6.11) or [HDF5](https://github.com/LLNL/spark-hdf5)!

In this tutorial we will review the steps to simply create RDD from 3D data sets. A 3DRDD is simply a RDD whose elements are 3D objects. Currently, spark3D supports 2 kind of objects: points (`Point3D`) and spheres (`ShellEnvelope`). Note that spheres are a sub-case of shells.

### Loading Point3D

A point is an object with 3 spatial coordinates. In spark3D, you can choose the coordinate system between cartesian `(x, y, z)` and spherical `(r, theta, phi)`. Let's suppose we have a text file (CSV, JSON, or TXT) whose columns are labeled `x`, `y` and `z`, the cartesian coordinates of points:
A point is an object with 3 spatial coordinates. In spark3D, you can choose the coordinate system between cartesian `(x, y, z)` and spherical `(r, theta, phi)`. Let's suppose we have a file which contains data vectors labeled `x`, `y` and `z`, the cartesian coordinates of points:

```scala
import com.astrolabsoftware.spark3d.spatial3DRDD.Point3DRDD

// We assume filename contains at least 3 columns whose names are `colnames`
// Order of columns in the file does not matter, as they will be re-aranged
// according to `colnames`.
val pointRDD = new Point3DRDD(spark: SparkSession, filename: String, colnames: String, isSpherical: Boolean)
val pointRDD = new Point3DRDD(spark: SparkSession, filename: String, colnames: String, isSpherical: Boolean, format: String, options: Map[String, String])
```

With FITS data, with data in the HDU #1, you would just do
`format` and `options` control the correct reading of your data.

* `format` is the name of the data source as registered in Spark. For example: `csv`, `json`, `org.dianahep.sparkroot`, ... For Spark built-in see [here](https://github.com/apache/spark/blob/301bff70637983426d76b106b7c659c1f28ed7bf/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala#L560).
* `options`: Options to pass to the `DataFrameReader` (see below for examples).

**CSV / JSON**
```scala
import com.astrolabsoftware.spark3d.spatial3DRDD.Point3DRDD
// Spark datasource / You would replace "csv" by "json" for a json file
val format = "csv"
// Options to pass to the DataFrameReader - optional
val options = Map("header" -> "true")
```

// We assume hdu#1 of filename contains at least 3 columns whose names are `colnames`
// Order of columns in the file does not matter, as they will be re-aranged
// according to `colnames`.
val hdu = 1
val pointRDD = new Point3DRDD(spark: SparkSession, filename: String, hdu: Int, colnames: String, isSpherical: Boolean)
**TXT**
```scala
// Spark datasource / use csv for text file with custom separator
val format = "csv"
// Options to pass to the DataFrameReader - optional
val options = Map("header" -> "true", "sep" -> " ")
```

**FITS**
```scala
// Spark datasource
val format = "fits" // or "com.astrolabsoftware.sparkfits"
// Options to pass to the DataFrameReader - optional
val options = Map("hdu" -> "1")
```

**HDF5**
```scala
// Spark datasource
val format = "hdf5" // or "gov.llnl.spark.hdf"
// Options to pass to the DataFrameReader - optional
val options = Map("dataset" -> "/toto")
```

**ROOT (<= 6.11)**
```scala
// Spark datasource
val format = "org.dianahep.sparkroot"
// Options to pass to the DataFrameReader - optional
val options = Map("" -> "")
```

The resulting RDD is a `RDD[Point3D]`. Note that there is no space between columns labels.

### Loading Sphere

A sphere is defined by its center (3 spatial coordinates) plus a radius.
In spark3D, you can choose the coordinate system of the center between cartesian `(x, y, z)` and spherical `(r, theta, phi)`. Let's suppose we have a text file (CSV, JSON, or TXT) whose columns are labeled `r`, `theta`, `phi`, the spherical coordinates and `radius`:
In spark3D, you can choose the coordinate system of the center between cartesian `(x, y, z)` and spherical `(r, theta, phi)`. Let's suppose we have a file which contains data vectors labeled `r`, `theta`, `phi`, the spherical coordinates and `radius`. Similarly to `Point3DRDD` you would use:

```scala
import com.astrolabsoftware.spark3d.spatial3DRDD.SphereRDD

// We assume filename contains at least 4 columns whose names are `colnames`.
// Order of columns in the file does not matter, as they will be re-aranged
// according to `colnames`.
val pointRDD = new SphereRDD(spark: SparkSession, filename: String, colnames: String, isSpherical: Boolean)
val sphereRDD = new SphereRDD(spark: SparkSession, filename: String, colnames: String, isSpherical: Boolean, format: String, options: Map[String, String])
```

The resulting RDD is a `RDD[Sphere]`.
The resulting RDD is a `RDD[ShellEnvelope]`.

### Loading Shells and Boxes

TBD.

### Loading data from a different data source

Since the scientific domain considered here is mostly the Astrophysics domain,
the natural storage or exchange file format is the FITS format.
Therefore we consider as part of the problem, the possibility to allow FITS files
to be directly injected into the HDFS infrastructure, so as to develop a Spark based applications. The usual [cfitsio](https://heasarc.gsfc.nasa.gov/fitsio/fitsio.html) library, as well as the FITS I/O format are not adapted to a distributed file system as HDFS.
Therefore we will have to develop low level Reader/Writer services,
to support direct access to FITS data, without copy nor conversion needs.
To tackle this challenge, we started a new project called
[spark-fits](https://github.com/astrolabsoftware/spark-fits), which provides a
Spark connector for FITS data, and a Scala library for manipulating FITS file.

We plan to release more in the future (HDF5 and ROOT on the TODO list!), and you are welcome to submit requests for specific data sources!
Alternatively you can define your own routine to read it. Add a new routine in `Loader.scala`:

```scala
/**
* Your doc is important
*/
def Point3DRDDFromMySource(spark : SparkSession, filename : String, colnames : String,
isSpherical: Boolean, <other_options>): RDD[Point3D] {

// Read the data using your data source
val df = spark.read.option(...)

// Grab the name of columns
val csplit = colnames.split(",")

// Select the 3 columns (x, y, z)
// and cast to double in case.
val rawRDD = df.select(
col(csplit(0)).cast("double"),
col(csplit(1)).cast("double"),
col(csplit(2)).cast("double")
)
// DF to RDD
.rdd
// map to Point3D
.map(x => new Point3D(
x.getDouble(0), x.getDouble(1), x.getDouble(2), isSpherical)
)

rawRDD
}

```

and then update `Point3DRDD.scala`:

```scala
/**
* Your doc is important
*/
def this(spark : SparkSession, filename : String, colnames : String, isSpherical: Boolean, <other_options>) {
this(Point3DRDDFromMySource(spark, filename, colnames, isSpherical, <other_options>), isSpherical)
}
```

and finally load your data using:

```scala
val myPoints = new Point3DRDD(spark : SparkSession, filename : String, colnames : String, isSpherical: Boolean, <other_options>)
```

here the example makes use of the DataFrame API, but you can use the RDD API as well to read your data.
14 changes: 8 additions & 6 deletions docs/03_partitioning.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,13 @@ val spark = SparkSession.builder()

// Data is in src/test/resources
val fn = "astro_obs.fits"
val hdu = 1
val columns = "Z_COSMO,RA,DEC"
val spherical = true
val format = "fits" // "com.astrolabsoftware.sparkfits"
val options = Map("hdu" -> "1")

// Load the data
val pointRDD = new Point3DRDD(spark, fn, hdu, columns, spherical)
val pointRDD = new Point3DRDD(spark, fn, columns, spherical, format, options)

// nPart is the wanted number of partitions. Default is pointRDD partition number.
val pointRDD_partitioned = pointRDD.spatialPartitioning(GridType.LINEARONIONGRID, nPart)
Expand All @@ -53,10 +54,10 @@ val pointRDD_partitioned = pointRDD.spatialPartitioning(GridType.LINEARONIONGRID

### Octree Partitioning

In the following example, we load `Point3D` data, and we re-partition it with the octree partitioning
In the following example, we load `ShellEnvelope` data (spheres), and we re-partition it with the octree partitioning

```scala
import com.astrolabsoftware.spark3d.spatial3DRDD.Point3DRDD
import com.astrolabsoftware.spark3d.spatial3DRDD.SphereRDD
import com.astrolabsoftware.spark3d.utils.GridType

import org.apache.spark.sql.SparkSession
Expand All @@ -67,12 +68,13 @@ val spark = SparkSession.builder()

// Data is in src/test/resources
val fn = "cartesian_spheres.fits"
val hdu = 1
val columns = "x,y,z,radius"
val spherical = false
val format = "fits" // com.astrolabsoftware.sparkfits
val options = Map("hdu" -> "1")

// Load the data
val sphereRDD = new Point3DRDD(spark, fn, hdu, columns, spherical)
val sphereRDD = new SphereRDD(spark, fn, columns, spherical, format, options)

// nPart is the wanted number of partitions (floored to a power of 8).
// Default is sphereRDD partition number.
Expand Down
12 changes: 7 additions & 5 deletions docs/04_query.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,13 @@ val spark = SparkSession.builder()

// Data files are in src/test/resources
val fn = "astro_obs.fits"
val hdu = 1
val columns = "Z_COSMO,RA,DEC"
val spherical = true
val format = "fits" // "com.astrolabsoftware.sparkfits"
val options = Map("hdu" -> "1")

// Load a RDD[Point3D] from the FITS file
val objects = new Point3DRDD(spark, fn, hdu, columns, spherical)
val objects = new Point3DRDD(spark, fn, columns, spherical, format, options)

// Define our envelope, here a sphere.
val center = new Point3D(0.9, 0.0, 0.0, spherical)
Expand Down Expand Up @@ -64,13 +65,14 @@ val spark = SparkSession.builder()
// Data files are in src/test/resources
val fnA = "astro_obs.fits"
val fnB = "astro_obs2.fits"
val hdu = 1
val columns = "Z_COSMO,RA,DEC"
val spherical = true
val format = "fits" // "com.astrolabsoftware.sparkfits"
val options = Map("hdu" -> "1")

// Load the two data sets
val setA = new Point3DRDD(spark, fnA, hdu, columns, spherical)
val setB = new Point3DRDD(spark, fnB, hdu, columns, spherical)
val setA = new Point3DRDD(spark, fnA, hdu, columns, spherical, format, options)
val setB = new Point3DRDD(spark, fnB, hdu, columns, spherical, format, options)
```

By default, the two sets are partitioned randomly (in the sense points spatially close are probably not in the same partition).
Expand Down
7 changes: 4 additions & 3 deletions src/main/scala/com/spark3d/examples/CrossMatch.scala
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ object CrossMatch {
val fnB_fits = args(1).toString

// HDU index
val hdu = args(2).toInt
val hdu = args(2).toString

// Columns to load
val columns = args(3).toString
Expand All @@ -91,8 +91,9 @@ object CrossMatch {
val kind = args(6).toString

// Load the data as Point3DRDD
val pointRDDA = new Point3DRDD(spark, fnA_fits, hdu, columns, true)
val pointRDDB = new Point3DRDD(spark, fnB_fits, hdu, columns, true)
val options = Map("hdu" -> hdu)
val pointRDDA = new Point3DRDD(spark, fnA_fits, columns, true, "fits", options)
val pointRDDB = new Point3DRDD(spark, fnB_fits, columns, true, "fits", options)

// Re-partition the space and cache the result
val pointRDD_partA = pointRDDA.spatialPartitioning(
Expand Down
3 changes: 2 additions & 1 deletion src/main/scala/com/spark3d/examples/OnionSpace.scala
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,9 @@ object OnionSpace {
val fn_fits = args(0).toString

// Load the data as Point3DRDD
val options = Map("hdu" -> args(1).toString)
val pointRDD = new Point3DRDD(
spark, fn_fits, args(1).toInt, args(2).toString, true)
spark, fn_fits, args(2).toString, true, "fits", options)

// Count the number of partition before, and number of elements per partition
val partitionsBefore = pointRDD.rawRDD.mapPartitions(
Expand Down
Loading

0 comments on commit 02c8ef6

Please sign in to comment.