From ef86df308452b4f47a51ee30c1d7217036c95792 Mon Sep 17 00:00:00 2001 From: Julien Date: Fri, 8 Feb 2019 13:23:20 +0100 Subject: [PATCH 1/3] Add supplementary notebook for 1807.03078 --- Spark4Physicists.ipynb | 745 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 745 insertions(+) create mode 100644 Spark4Physicists.ipynb diff --git a/Spark4Physicists.ipynb b/Spark4Physicists.ipynb new file mode 100644 index 0000000..4c79200 --- /dev/null +++ b/Spark4Physicists.ipynb @@ -0,0 +1,745 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# Analysing billion-objects catalogue interactively: Spark for physicists\n", + "
Authors: **S. Plaszczynski, J. Peloton, C. Arnault and J.E. Campagne **\n", + "
***LAL, Univ. Paris-Sud, CNRS/IN2P3, Universite Paris-Saclay, Orsay, France***\n", + "
Last Verified to Run: **2019-02-07**\n", + "\n", + "\n", + "\n", + "This jupyter notebook describes the analysis presented in https://arxiv.org/abs/1807.03078\n", + "The results presented here were run on the Universite Paris-Sud Spark cluster: https://www.informatique-scientifique.u-psud.fr/services/spark.html\n", + "\n", + "In order to run this notebook you need:\n", + "- a cluster running `pypark` (spark version >=2.2) on which the following python libraries have been installed (on each executor):\n", + "`matplotlib`,`numpy`,`pandas`,`pyarrow`,`healpy`\n", + "- a spark jupyter kernel (NERSC users: https://github.com/astrolabsoftware/spark-kernel-nersc)\n", + "- the `SparkFITS` library : https://astrolabsoftware.github.io/spark-fits/\n", + "- a set of FITS files, as produced for instance by the `CoLoRe` simulation : https://github.com/damonge/CoLoRe\n", + "The directory that points to these files should be put in the FITSDIR environment variable. Any file(s) with (RA,DEC,redshift) coordinates may also be used with little adaptation to match the exact key names.\n", + "\n", + "For testing, Spark (and these dependencies) can also be installed on a personal computer https://spark.apache.org/docs/latest/.\n", + "\n", + "For LSST/DESC collaborators with a NERSC account see https://github.com/LSSTDESC/desc-spark (FITSDIR=/global/cscratch1/sd/plaszczy/LSST10Y)\n", + "\n", + "\n" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Initialisation" + ] + }, + { + "cell_type": "code", + "execution_count": 2, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Spark session started\n" + ] + } + ], + "source": [ + "from pyspark.sql import SparkSession\n", + "from pyspark import StorageLevel\n", + "from pyspark.sql import functions as F\n", + "from pyspark.sql.functions import randn\n", + "from pyspark.sql.types import IntegerType,FloatType\n", + "from pyspark.sql.functions import pandas_udf, PandasUDFType\n", + "\n", + "spark = SparkSession.builder.getOrCreate()\n", + "sc=spark.sparkContext\n", + "print(\"Spark session started\")\n", + "\n", + "#a class for timing\n", + "from time import time\n", + "class Timer:\n", + " \"\"\"\n", + " a simple class for printing time (s) since last call\n", + " \"\"\"\n", + " def __init__(self):\n", + " self.t0=time()\n", + " \n", + " def start(self):\n", + " self.t0=time()\n", + " \n", + " def stop(self):\n", + " t1=time()\n", + " print(\"{:2.1f}s\".format(t1-self.t0))\n", + "\n", + "timer=Timer()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Reading the data \n", + "\n", + "all CoLoRe FITS files assumed to be in a directory pointed by the FITSDIR env variable\n", + "Le us build the **gal** datafreame selecting only the \"RA\" and \"dec\" columns and building a new redshift column called \"z\" on the flight." + ] + }, + { + "cell_type": "code", + "execution_count": 2, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "6.1s\n" + ] + } + ], + "source": [ + "import os\n", + "dirfits=os.environ['FITSDIR']\n", + "timer.start()\n", + "gal=spark.read.format(\"fits\").option(\"hdu\",1)\\\n", + " .load(dirfits)\\\n", + " .select(F.col(\"RA\"), F.col(\"Dec\"), (F.col(\"Z_COSMO\")+F.col(\"DZ_RSD\")).alias(\"z\"))\n", + "timer.stop()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Pretty fast isn't it? This is because of lazy evaluation : data have not been actaully loaded (yet). The pipeline (Direct Acyclic Graph, DAG) has been updated but its execution has not been triggered since read/load/slect are not actions but transformations.\n", + "At this level the schema of the data is however available so you can looks at it" + ] + }, + { + "cell_type": "code", + "execution_count": 3, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "root\n", + " |-- RA: float (nullable = true)\n", + " |-- Dec: float (nullable = true)\n", + " |-- z: float (nullable = true)\n", + "\n" + ] + } + ], + "source": [ + "gal.printSchema()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "you may want to look at a few values to check eveything is OK. Let's print the first 5" + ] + }, + { + "cell_type": "code", + "execution_count": 4, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "+---------+---------+---------+\n", + "| RA| Dec| z|\n", + "+---------+---------+---------+\n", + "|225.80168|18.519966|2.4199903|\n", + "|225.73839|18.588171|2.4056022|\n", + "|225.79999|18.635067| 2.396816|\n", + "|225.49783|18.570776|2.4139786|\n", + "|225.57983|18.638515|2.3995044|\n", + "+---------+---------+---------+\n", + "only showing top 5 rows\n", + "\n", + "14.1s\n" + ] + } + ], + "source": [ + "timer.start()\n", + "gal.show(5)\n", + "timer.stop()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Still pretty fast. Here show is an action and has triggered the data access.But not the full data set : since we required only the first 5 rows, the DAG was (automatically) analyzed and only the first block was actually read" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Now we add an extra column that performs gaussian smearing on the \"z\" column. We use the Spark \"rand\" function that is highly optimized" + ] + }, + { + "cell_type": "code", + "execution_count": 5, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "+---------+---------+---------+---------+\n", + "| RA| Dec| z| zrec|\n", + "+---------+---------+---------+---------+\n", + "|225.80168|18.519966|2.4199903|2.3029215|\n", + "|225.73839|18.588171|2.4056022|2.6602228|\n", + "|225.79999|18.635067| 2.396816|2.3686476|\n", + "|225.49783|18.570776|2.4139786|2.4345846|\n", + "|225.57983|18.638515|2.3995044|2.3599765|\n", + "+---------+---------+---------+---------+\n", + "only showing top 5 rows\n", + "\n", + "8.3s\n" + ] + } + ], + "source": [ + "timer.start()\n", + "gal=gal.withColumn(\"zrec\",(gal.z+0.03*(1+gal.z)*randn()).astype('float'))\n", + "gal.show(5)\n", + "timer.stop()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Now we have all our columns ready, we put data in cache. Recall that the amount of memory available for the cache is about 60% of your total executors memory. You may simply use the cache() function. Here in case you do not have enough memory we use a slighlty more evolved form that fills the cache and spills the rest over disk if necessary.\n", + "Remember to finalize your command with an actiom as count() here.\n", + "This is the lenghty part but you need to perform it only once." + ] + }, + { + "cell_type": "code", + "execution_count": 6, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "#galaxies=5926764520\n", + "97.6s\n" + ] + } + ], + "source": [ + "timer.start()\n", + "print(\"#galaxies={}\".format(gal.persist(StorageLevel.MEMORY_AND_DISK).count()))\n", + "timer.stop()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "We put about 6 billions of galaxies in cache, just as if we had a hge RAM. All further computattions will be boosted." + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# Basic statistics" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Le us first look at some simple statistics on a single column" + ] + }, + { + "cell_type": "code", + "execution_count": 7, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "+-------+-------------------+\n", + "|summary| z|\n", + "+-------+-------------------+\n", + "| count| 5926764520|\n", + "| mean| 0.875229446764415|\n", + "| stddev|0.47360539212211805|\n", + "| min| -5.93947E-4|\n", + "| max| 2.4352543|\n", + "+-------+-------------------+\n", + "\n", + "5.6s\n" + ] + } + ], + "source": [ + "timer.start()\n", + "gal.describe(['z']).show()\n", + "timer.stop()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "On all the columns:" + ] + }, + { + "cell_type": "code", + "execution_count": 8, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "+-------+------------------+--------------------+------------------+-------------------+\n", + "|summary| RA| Dec| z| zrec|\n", + "+-------+------------------+--------------------+------------------+-------------------+\n", + "| count| 5926764520| 5926764520| 5926764520| 5926764520|\n", + "| mean|179.99419917017775|-0.00314190695924...|0.8752294467644149| 0.8752288274953041|\n", + "| stddev|103.92702755739798| 39.17648956108155|0.4736053921221175|0.47714711978268487|\n", + "| min| 4.9597503E-8| -89.997185| -5.93947E-4| -0.124362685|\n", + "| max| 360.0| 89.9986| 2.4352543| 2.95336|\n", + "+-------+------------------+--------------------+------------------+-------------------+\n", + "\n", + "9.2s\n" + ] + } + ], + "source": [ + "timer.start()\n", + "gal.describe().show()\n", + "timer.stop()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "If you just need some specific values it will be more efficient to call them directly" + ] + }, + { + "cell_type": "code", + "execution_count": 9, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "min=-0.0005939470138400793 max=2.4352543354034424\n", + "4.5s\n" + ] + } + ], + "source": [ + "timer.start()\n", + "minmax=gal.select(F.min(\"z\"),F.max(\"z\")).first()\n", + "print(\"min={} max={}\".format(minmax[0],minmax[1]))\n", + "timer.stop()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# Histograms" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "The distributed way of building histograms is:\n", + "- add a column of bin values\n", + "- groupBy this column and count the number of elements in each group\n", + "- you may then drop the bin and add the center of the bin locations." + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## with Spark native functions\n", + "Here is how to do it using Spark dataframe functions" + ] + }, + { + "cell_type": "code", + "execution_count": 10, + "metadata": {}, + "outputs": [], + "source": [ + "#histogram caracteristics\n", + "zmin=minmax[0]\n", + "zmax=minmax[1]\n", + "Nbins=100\n", + "dz=(zmax-zmin)/Nbins\n", + "\n", + "#create the bin number column\n", + "zbin=gal.select(gal.z,((gal['z']-zmin-dz/2)/dz).cast(IntegerType()).alias('bin'))\n", + "# groupBy it + count + sort by bin values\n", + "h=zbin.groupBy(\"bin\").count().orderBy(F.asc(\"bin\"))\n", + "# add the bin locations (zbin) and drop the bin numer\n", + "h=h.select(\"bin\",(zmin+dz/2+h['bin']*dz).alias('loc'),\"count\").drop(\"bin\")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Now data is reduced to a simple histogram, we go back to the standard python world by converting the dataframe to a pandas object (this is the action so triggers the DAG materialization)" + ] + }, + { + "cell_type": "code", + "execution_count": 11, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Index(['loc', 'count'], dtype='object') size=100\n", + "13.0s\n" + ] + } + ], + "source": [ + "timer.start()\n", + "p=h.toPandas()\n", + "print(p.columns,\"size={}\".format(p.index.size))\n", + "timer.stop()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Let us plot the result with matplotlib" + ] + }, + { + "cell_type": "code", + "execution_count": 12, + "metadata": {}, + "outputs": [ + { + "data": { + "image/png": "iVBORw0KGgoAAAANSUhEUgAAAXcAAAERCAYAAACAbee5AAAABHNCSVQICAgIfAhkiAAAAAlwSFlzAAALEgAACxIB0t1+/AAAADl0RVh0U29mdHdhcmUAbWF0cGxvdGxpYiB2ZXJzaW9uIDIuMi4yLCBodHRwOi8vbWF0cGxvdGxpYi5vcmcvhp/UCwAAEuZJREFUeJzt3W2MZOdZ5vH/xcR2JCcEwQwi8oynDRjBECU4aTmBwGJIFtn+YH8goLEIYOQw4sXhJQjkBeRE5hNBu2jZdchOwDJEEBMCggFNMIIYBQGO3CHB2I4MgzONW05w581ZZHDw7s2HrrbL7eqpM91Vdaqe+v+kVqrqnKm5z9Tk8j33OeepVBWSpLZ8Ud8FSJImz3CXpAYZ7pLUIMNdkhpkuEtSgwx3SWpQr+Ge5I4kjyd5oMO+lya5J8lHktyf5NpZ1ChJi6jvzv1O4OqO+/488N6qugI4DrxjWkVJ0qLrNdyr6oPAZ4ZfS/JVSf4kyYeT/GWSr93eHfjiweOXAI/NsFRJWigv6LuAEU4CP1RV/5jk1Wx16N8OvA340yRvBi4GXt9fiZI03+Yq3JO8CPgm4HeTbL980eB/bwDurKr/nuQbgXcneVlV/f8eSpWkuTZX4c7WmOhzVfUNI7bdxGA+X1V/k+SFwEHg8RnWJ0kLoe8Tqs9RVZ8HPp7kuwCy5RWDzf8MvG7w+tcBLwQ2eylUkuZc+lwVMsl7gKvY6sD/BXgr8AHgV4GXAhcAd1XVbUmOAe8CXsTWydWfqao/7aNuSZp3vYa7JGk65mosI0majN5OqB48eLBWVlb6+u0laSF9+MMf/lRVHRq3X2/hvrKywtraWl+/vSQtpCTrXfZzLCNJDTLcJalBhrskNchwl6QGGe6S1CDDXZIaZLhLUoMMd0lqkOEuSQ0y3BfcysoKSUjCxRdfPPLxzucu+yC1b96+rEMdrKyssL7+7B3I2yt7Jhn5eNQ2SW2zc19A6+vrVBV7Xa75oosusqOXGmfnvoSeeuqpXbt9SW2wc9czhjt6u3hpsdm5L4idc/Zp2NnRS1pcdu4LYr9zdknLxXCXpAYZ7nNs+Br2WXP+Li02Z+5zbHsUA7OfgTt/lxabnbskNchw11iOaKTF41hmzsziksfz5YhGWjx27nPGSx4lTYLhLkkNGhvuSe5I8niSB3bZ/j1J7h/8/HWSV0y+TM0L5+/SYujSud8JXH2O7R8HvrWqXg78AnByAnVpTm3P36tq7s4NSHrW2BOqVfXBJCvn2P7XQ0/vBQ7vvyxJ0n5MeuZ+E/D+3TYmOZFkLcna5ubmhH/rxdXnnaj74YhGml8TuxQyybexFe7fvNs+VXWSwdhmdXXVy0EG+rwTdT+8RFKaXxMJ9yQvB34NuKaqPj2J95Qk7d2+xzJJLgV+H/jeqvqH/ZckSdqvsZ17kvcAVwEHk2wAbwUuAKiqdwK3Al8GvGPwT/Onq2p1WgVLksbrcrXMDWO2vwl408QqWhLzuMzAfmyfXAU4evQoZ8+e7bcgacm5tkxPFvUk6m48uSrNF5cfkKQGGe6S1CDDXRPnzU1S/5y5a+Kcv0v9s3OfoUVdZkDS4jHcZ8gv4pA0K4a7psr5u9QPZ+6aKufvUj/s3CWpQYa7JDXIcNfMOH+XZseZu2bG+bs0O3buktQgw12SGmS4S1KDDPcpc8mB0Ty5Kk2XJ1SnrLUv5ZgUT65K02XnLkkNMtwlqUGGu3rn/F2aPGfu6p3zd2nyxnbuSe5I8niSB3bZniS/kuRMkvuTvHLyZUqSzkeXscydwNXn2H4NcPng5wTwq/svS5K0H2PDvao+CHzmHLtcD/xmbbkX+JIkL51UgYvIa9sl9W0SJ1QvAR4der4xeG1p+XV6kvo2iXAf1Z6OTLUkJ5KsJVnb3NycwG+t1njljDQZk7haZgM4MvT8MPDYqB2r6iRwEmB1ddW2Vs/jlTPSZEyicz8FfN/gqpnXAE9U1Scm8L6SpD0a27kneQ9wFXAwyQbwVuACgKp6J3AauBY4AzwJ/MC0ipUkdTM23KvqhjHbC/jRiVUkDWzP3wGOHj3K2bNn+y1IWiDeoaq55fxd2jvXlpGkBhnuktQgw12SGmS4ayF4c5N0fjyhOiErKyusr6/3XUazPLkqnR879wlxPRlJ88Rwl6QGGe6S1CDDXQvHk6vSeJ5Q1cLx5Ko0np27JDXIcJekBhnuktQgw10LzZOr0mieUN0H70rtnydXpdHs3PfBu1IlzSvDXZIaZLhLUoMMd0lqkOGuZnjljPQsr5ZRM7xyRnqWnbskNahTuCe5OsnDSc4kuWXE9kuT3JPkI0nuT3Lt5EuVJHU1NtyTHABuB64BjgE3JDm2Y7efB95bVVcAx4F3TLpQ6Xw4f9ey6zJzvxI4U1WPACS5C7geeGhonwK+ePD4JcBjkyxSOl/O37XsuoxlLgEeHXq+MXht2NuANybZAE4Dbx71RklOJFlLsra5ubmHcvu3srLyTEcoSfOqS7iPSrGd99vfANxZVYeBa4F3J3nee1fVyapararVQ4cOnX+1c8AlByQtgi7hvgEcGXp+mOePXW4C3gtQVX8DvBA4OIkCJUnnr0u43wdcnuSyJBeydcL01I59/hl4HUCSr2Mr3Bdz7qLmeHJVy2jsCdWqejrJzcDdwAHgjqp6MMltwFpVnQJ+CnhXkp9ka2RzYzm30Jzw5KqWUac7VKvqNFsnSodfu3Xo8UPAaydbmiRpr7xDVZIaZLhLUoMMdy2V4ZOrnmBVy1wVUktl+OQqeIJV7bJzl6QGGe6S1CDDXUvNG5zUKmfuWmre4KRW2bl34EqQkhaN4d6BK0FKWjSGuyQ1yHCXBjy5qpZ4QlUa8OSqWmLnLkkNMtwlqUGGuzSC83ctOmfu0gjO37Xo7NwlqUGGuyQ1yHCXpAYZ7tIYnlzVIvKEqjSGJ1e1iDp17kmuTvJwkjNJbtlln+9O8lCSB5P89mTLnD1XgpS0yMZ27kkOALcD/xXYAO5LcqqqHhra53LgvwGvrarPJvnyaRU8K9srQYLdmqTF06VzvxI4U1WPVNUXgLuA63fs84PA7VX1WYCqenyyZUrzwfm7FkWXmfslwKNDzzeAV+/Y52sAkvwVcAB4W1X9yUQqlOaI83ctii7hPupv8M5vrXgBcDlwFXAY+MskL6uqzz3njZITwAmASy+99LyLlSR102UsswEcGXp+GHhsxD5/WFX/UVUfBx5mK+yfo6pOVtVqVa0eOnRorzVLksboEu73AZcnuSzJhcBx4NSOff4A+DaAJAfZGtM8MslCpXnj/F3zbOxYpqqeTnIzcDdb8/Q7qurBJLcBa1V1arDtO5I8BPw/4Ker6tPTLFzqm/N3zbNONzFV1Wng9I7Xbh16XMBbBj+SpJ65/IAkNchwlybA+bvmjWvLSBPg/F3zxs5dkhpkuEtSgwz3Ia4EqUlw/q554Mx9iCtBahKcv2se2LlLUoMMd2mKHNGoL45lpClyRKO+2LlLUoMMd0lqkOEuzYjzd82SM3dpRpy/a5bs3CWpQYa7JDXIcJd64Pxd0+bMXeqB83dNm527JDVo6cPdlSDVN0c0moalH8u4EqT65ohG07D0nbsktchwl+aIIxpNSqdwT3J1koeTnElyyzn2e0OSSrI6uRKl5bE9oqkq1tfX+y5HC2xsuCc5ANwOXAMcA25IcmzEfi8Gfgz40KSLlCSdny6d+5XAmap6pKq+ANwFXD9iv18A3g78+wTrkyTtQZdwvwR4dOj5xuC1ZyS5AjhSVX98rjdKciLJWpK1zc3N8y5WWibO37UfXS6FHHVtVj2zMfki4JeBG8e9UVWdBE4CrK6u1pjdpaXmJZLajy6d+wZwZOj5YeCxoecvBl4G/EWSs8BrgFOeVJWk/nQJ9/uAy5NcluRC4DhwantjVT1RVQeraqWqVoB7geuqam0qFUtLyBGNztfYsUxVPZ3kZuBu4ABwR1U9mOQ2YK2qTp37HSTtlyMana9Oyw9U1Wng9I7Xbt1l36v2X5ak3Wx38QBHjx7l7Nmz/RakubT0a8tIi8YuXl0s5fIDrgQpqXVLGe7bK0Fudz/SovJEq3bjWEZaYI5otJul7NwlqXWGu9QIRzQa5lhGaoQjGg2zc5ekBhnuUoMc0cixjNQgRzSyc5caZxe/nOzcpcbZxS8nO3dJapDhLi0RRzTLw7GMtEQc0SyPpencXQlSei67+LYtTee+vRIk2LFIYBffuqXp3CXtbriLt5Nvw9J07pJ2N9zFg518C+zcJT2P8/jFZ+cu6Xmcxy8+O3dJ52QXv5g6hXuSq5M8nORMkltGbH9LkoeS3J/kz5McnXypkvqw3cVXFevr632Xo47GhnuSA8DtwDXAMeCGJMd27PYRYLWqXg68D3j7pAuV1D+7+MXRZeZ+JXCmqh4BSHIXcD3w0PYOVXXP0P73Am+cZJGS5oOz+MXRZSxzCfDo0PONwWu7uQl4/6gNSU4kWUuytrm52b1KSXPHLn6+dQn3Uf95rhGvkeSNwCrwS6O2V9XJqlqtqtVDhw51r1LS3BmexX/yk5806OdMl7HMBnBk6Plh4LGdOyV5PfBzwLdW1VOTKU/SInBcM3+6dO73AZcnuSzJhcBx4NTwDkmuAP4PcF1VPT75MvfGxcKk2XNcMx/GhntVPQ3cDNwNfAx4b1U9mOS2JNcNdvsl4EXA7yb5aJJTu7zdTG0vFjZ8W7Wk6XJcMx863aFaVaeB0zteu3Xo8esnXJekBjiu6Y93qEqaCcc1s2W4S5oJxzWz5cJhkmbOcc302blL6pXjmukw3CX1ynHNdBjukuaGQT85hrukuWTQ709z4e5dqVJ7DPrz11y4e1eq1DaDvpvmwl3S8jDod2e4S2qCQf9chruk5hj0hrukxi1r0BvukpbGMgW94S5pKe0W9BdffHEToW+4S1p6w0H/5JNPjgz9RQt7w12SdjEc+ovW4S98uA/fkepdqZKmqUuHPy+hv/DhPnxHqnelSupDl9CfddD7ZR2SNCV9finJwnfukqTnM9wlqUGGuyQ1qFO4J7k6ycNJziS5ZcT2i5L8zmD7h5KsTLpQSVJ3Y8M9yQHgduAa4BhwQ5JjO3a7CfhsVX018MvAL0660GF+IYcknVuXzv1K4ExVPVJVXwDuAq7fsc/1wG8MHr8PeF2mmLx+IYekRXPRRRfN9LLILpdCXgI8OvR8A3j1bvtU1dNJngC+DPjU8E5JTgAnBk//NcnDeykaOJjkmffe+d+R4ecNPj4IfOp8f+0c1L3fx7t+5vt5PCfHNpXPvIE/l6l85n3+uWxbX18fN3k4yI78HHL0XL9wW5dwH1XBzpa5yz5U1UngZIff89wFJWtVtbrf91lEy3rsy3rcsLzHvqzHDZM59i5jmQ3gyNDzw8Bju+2T5AXAS4DP7KcwSdLedQn3+4DLk1yW5ELgOHBqxz6ngO8fPH4D8IFyIC5JvRk7lhnM0G8G7gYOAHdU1YNJbgPWquoU8OvAu5OcYatjPz7NopnAaGeBLeuxL+txw/Ie+7IeN0xifG2DLUnt8Q5VSWqQ4S5JDZrrcM8SL3vQ4dhvTLKZ5KODnzf1UeekJbkjyeNJHthle5L8yuDP5f4kr5x1jdPQ4bivSvLE0Od966xrnIYkR5Lck+RjSR5M8uMj9mn1M+9y7Hv/3Ie/6GKeftg6eftPwFcCFwJ/Bxzbsc+PAO8cPD4O/E7fdc/w2G8E/nfftU7h2P8L8ErggV22Xwu8n617K14DfKjvmmd03FcBf9x3nVM47pcCrxw8fjHwDyP+rrf6mXc59j1/7vPcuc/dsgcz1OXYm1RVH+Tc90hcD/xmbbkX+JIkL51NddPT4bibVFWfqKq/HTz+v8DH2LrjfVirn3mXY9+zeQ73Ucse7Dzw5yx7AGwve7Douhw7wHcO/pn6viRHRmxvUdc/mxZ9Y5K/S/L+JF/fdzGTNhirXgF8aMem5j/zcxw77PFzn+dwn9iyBwuoy3H9EbBSVS8H/oxn/wXTulY/83H+FjhaVa8A/hfwBz3XM1FJXgT8HvATVfX5nZtH/JJmPvMxx77nz32ew32Zlz0Ye+xV9emqemrw9F3Aq2ZUW9+6/L1oTlV9vqr+dfD4NHBBkoM9lzURSS5gK9x+q6p+f8QuzX7m4459P5/7PIf7Mi97MPbYd8wcr2NrXrcMTgHfN7iC4jXAE1X1ib6LmrYkX7F9PinJlWz9f/fT/Va1f4Nj+nXgY1X1P3bZrcnPvMux7+dz77IqZC9qPpc9mImOx/5jSa4Dnmbr2G/sreAJSvIetq4QOJhkA3grcAFAVb0TOM3W1RNngCeBH+in0snqcNxvAH44ydPAvwHHG2lkXgt8L/D3ST46eO1ngUuh7c+cbse+58/d5QckqUHzPJaRJO2R4S5JDTLcJalBhrskNchwl6QGGe6S1CDDXZIaZLhLA0l+aGjd7I8nuafvmqS98iYmaYfBeh8fAN5eVX/Udz3SXti5S8/3P9lap8hg18Ka27VlpD4kuRE4CtzccynSvjiWkQaSvIqtdfG/pao+23c90n44lpGedTPwpcA9g5Oqv9Z3QdJe2blLUoPs3CWpQYa7JDXIcJekBhnuktQgw12SGmS4S1KDDHdJatB/Aq3INiHIhMzLAAAAAElFTkSuQmCC\n", + "text/plain": [ + "
" + ] + }, + "metadata": {}, + "output_type": "display_data" + } + ], + "source": [ + "%matplotlib inline\n", + "import matplotlib.pyplot as plt\n", + "plt.bar(p['loc'].values,p['count'].values,dz,color='white',edgecolor='black')\n", + "plt.xlabel(\"z\");" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## With a User Defined Function (udf)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Although absolutely unnecessary here, we illustrate how one could have plugged an external UDF (to compute the bin number. Performances are extrememly dowgraded so you should NOT do it this way (see next part)" + ] + }, + { + "cell_type": "code", + "execution_count": 14, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "111.3s\n" + ] + } + ], + "source": [ + "#define your UDF (here just binning)\n", + "binNumber_udf=F.udf(lambda z: int((z-zmin)/dz))\n", + "timer.start()\n", + "#create the bin number column\n", + "p_udf=gal.select(gal.z,binNumber_udf(gal.z).alias('bin')).groupBy(\"bin\").count().orderBy(F.asc(\"bin\")).toPandas()\n", + "timer.stop()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "That's not great.Let see how to be more efficient" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## With pandas_udf" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "The recommended way to bind Spark to external python functions is through pandas_udf which achieve some level of vectorization the previous method miss." + ] + }, + { + "cell_type": "code", + "execution_count": 13, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "46.1s\n" + ] + } + ], + "source": [ + "import pandas as pd\n", + "@pandas_udf(\"float\", PandasUDFType.SCALAR)\n", + "def binFloat(z):\n", + " return pd.Series((z-zmin)/dz)\n", + "timer.start()\n", + "p_udf=gal.select(gal.z,binFloat(\"z\").astype('int').alias('bin')).groupBy(\"bin\").count().orderBy(F.asc(\"bin\")).toPandas()\n", + "timer.stop()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# Tomography\n", + "This is a popular method in cosmology when one builds \"shells\" in some redshift range of over-densities and compute their 2D power-spectrum (using some spherical pixelization as HealPix). As the redshift decreases, matter clusters more and more and it is expected that these spectra increase in amplitude. Some cross-correlations between nearby bins also appear because of the smearing between the true redhifts (\"z\") and the observed ones (\"zrec\")." + ] + }, + { + "cell_type": "code", + "execution_count": 15, + "metadata": {}, + "outputs": [], + "source": [ + "# define a function that filters galaxies within a redshift range and prject them onto a healpix map\n", + "# that is returned\n", + "\n", + "import numpy as np\n", + "import healpy as hp\n", + "\n", + "nside=512\n", + "\n", + "def redshift_bin(gal,z1,z2): \n", + " timer.start()\n", + " #filtering\n", + " shell=gal.filter(gal['zrec'].between(z1,z2))\n", + " print(\"shell=[{},{}] N={}M\".format(z1,z2,shell.count()/1e6))\n", + " \n", + " #projection into healpixels: build a pandas_udf that transforms (ra,dec) into (theta,phi) \n", + " # and calls healpy ang2pix function \n", + " @pandas_udf('int', PandasUDFType.SCALAR)\n", + " def Ang2Pix(ra,dec):\n", + " return pd.Series(hp.ang2pix(nside,np.radians(90-dec),np.radians(ra)))\n", + " map=shell.select(Ang2Pix(\"RA\",\"Dec\").alias(\"ipix\")).groupBy(\"ipix\").count().toPandas()\n", + " \n", + " #back to python world\n", + " myMap = np.zeros(hp.nside2npix(nside))\n", + " myMap[map['ipix'].values]=map['count'].values\n", + " timer.stop()\n", + " return myMap" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Do a loop over redshift bins (choosen here so that there are some overlap between adajacent ones for the true redshift) and store the Healpix maps with the number of galaxies in each pixel in a dictionary for later use." + ] + }, + { + "cell_type": "code", + "execution_count": 54, + "metadata": { + "scrolled": true + }, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "shell=[0.0,0.13] N=68.557105M\n", + "27.8s\n", + "shell=[0.13,0.27] N=322.103946M\n", + "24.7s\n", + "shell=[0.27,0.43] N=669.646368M\n", + "30.3s\n", + "shell=[0.43,0.63] N=1049.274954M\n", + "35.3s\n", + "shell=[0.63,0.82] N=980.216568M\n", + "37.5s\n", + "shell=[0.82,1.05] N=984.086242M\n", + "33.9s\n", + "shell=[1.05,1.32] N=812.257315M\n", + "30.7s\n", + "shell=[1.32,1.61] N=529.298439M\n", + "30.5s\n", + "shell=[1.61,1.95] N=325.212359M\n", + "27.8s\n", + "shell=[1.95,2.32] N=156.932408M\n", + "26.4s\n" + ] + }, + { + "data": { + "text/plain": [ + "
" + ] + }, + "metadata": {}, + "output_type": "display_data" + } + ], + "source": [ + "#redshift bins\n", + "zshell=[0.0,0.13,0.27,0.43,0.63,0.82,1.05,1.32,1.61,1.95,2.32]\n", + "tmap=dict()\n", + "\n", + "#loop over bins and store healpix maps in a dictionary indexed by z1\n", + "plt.figure(figsize=(15,10))\n", + "for (z1,z2) in zip(zshell[:-1],np.roll(zshell,-1)): \n", + " tmap[z1]=redshift_bin(gal,z1,z2) " + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Now do a standard analysis as the density contrast power spectra with (standard) healpix functions" + ] + }, + { + "cell_type": "code", + "execution_count": 59, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "bin=[0.0,0.13] mean #gals=21.7937167485555\n", + "bin=[0.13,0.27] mean #gals=102.39408683776855\n", + "bin=[0.27,0.43] mean #gals=212.87484741210938\n", + "bin=[0.43,0.63] mean #gals=333.5555248260498\n", + "bin=[0.63,0.82] mean #gals=311.60245513916016\n", + "bin=[0.82,1.05] mean #gals=312.83259137471515\n", + "bin=[1.05,1.32] mean #gals=258.209646542867\n", + "bin=[1.32,1.61] mean #gals=168.25944232940674\n", + "bin=[1.61,1.95] mean #gals=103.38222471872966\n", + "bin=[1.95,2.32] mean #gals=49.88746897379557\n" + ] + }, + { + "data": { + "text/plain": [ + "Text(0,0.5,'power spectrum')" + ] + }, + "execution_count": 59, + "metadata": {}, + "output_type": "execute_result" + }, + { + "data": { + "image/png": "\n", + "text/plain": [ + "
" + ] + }, + "metadata": {}, + "output_type": "display_data" + } + ], + "source": [ + "for (z1,z2) in zip(zshell[:-1],np.roll(zshell,-1)): \n", + " galmap=tmap[z1]\n", + " #compute mean\n", + " Nmean=np.mean(galmap)\n", + " print(\"bin=[{},{}] mean #gals={}\".format(z1,z2,Nmean))\n", + " #and do fractional difference\n", + " dens_map=galmap/Nmean-1\n", + " #compute power-spectrum on the spehere using a standrd healpix function\n", + " cl=hp.anafast(dens_map)\n", + " plt.plot(cl,label=r\"$z \\in [{},{}]$\".format(z1,z2))\n", + "\n", + "plt.xlim(1,120)\n", + "plt.legend()\n", + "plt.xlabel(r\"$\\ell$\")\n", + "plt.ylabel(\"power spectrum\")" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.6.5" + } + }, + "nbformat": 4, + "nbformat_minor": 2 +} From c024c70f80d1994e86db998fad38c60fa00f4e50 Mon Sep 17 00:00:00 2001 From: Julien Date: Fri, 8 Feb 2019 13:31:14 +0100 Subject: [PATCH 2/3] Update the README with instructions --- README.md | 30 ++++++++++++++++++++++++++++-- 1 file changed, 28 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index d3a4d58..4a15ff9 100644 --- a/README.md +++ b/README.md @@ -1,2 +1,28 @@ -# 1807.03078 -Supplementary material for arXiv:1807.03078 +# Analysing billion-objects catalogue interactively: Apach Spark for physicists + +This repository contains supplementary material for arXiv:[1807.03078](https://arxiv.org/abs/1807.03078). + +## How to run the notebook + +You must have [Apache Spark](http://spark.apache.org/) and [Jupyter notebook](https://jupyter.org/) installed on your machine or your cluster. + +### On a local machine + +```bash +PACK="com.github.astrolabsoftware:spark-fits_2.11:0.7.2" +PYSPARK_DRIVER_PYTHON_OPTS="jupyter-notebook" pyspark \ + --master local[*] \ + --packages $PACK +``` + +### On a cluster + +Standalone mode: + +```bash +PACK="com.github.astrolabsoftware:spark-fits_2.11:0.7.2" +PYSPARK_DRIVER_PYTHON_OPTS="jupyter-notebook --debug --no-browser --port=$PORT1" pyspark \ + --master $SPARKURL \ + --packages $PACK \ + --driver-memory $MEMDRIVER --executor-memory $MEMEXEC --executor-cores $EXECCORES --total-executor-cores $TOTALCORES +``` \ No newline at end of file From b72e69999cacd8450f02b55210a8aff2b490d771 Mon Sep 17 00:00:00 2001 From: Julien Date: Fri, 8 Feb 2019 13:34:04 +0100 Subject: [PATCH 3/3] Add instructions for NERSC users --- README.md | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index 4a15ff9..1e4c25c 100644 --- a/README.md +++ b/README.md @@ -25,4 +25,8 @@ PYSPARK_DRIVER_PYTHON_OPTS="jupyter-notebook --debug --no-browser --port=$PORT1" --master $SPARKURL \ --packages $PACK \ --driver-memory $MEMDRIVER --executor-memory $MEMEXEC --executor-cores $EXECCORES --total-executor-cores $TOTALCORES -``` \ No newline at end of file +``` + +### DESC members: working at NERSC + +Source your DESC environement. Then go to the Jupyter Lab web [interface](https://jupyter-dev.nersc.gov/), and execute the notebook with the desc-pyspark kernel. \ No newline at end of file