diff --git a/Notebooks/Colors_Run12p_spark.ipynb b/Notebooks/Colors_Run12p_spark.ipynb
new file mode 100644
index 0000000..8b2d4a1
--- /dev/null
+++ b/Notebooks/Colors_Run12p_spark.ipynb
@@ -0,0 +1,540 @@
+{
+ "cells": [
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "# DC2 Run1.2 object catalog colors (Spark)\n",
+ "
Owner: **S Plaszczynski** (https://github.com/LSSTDESC/DC2-production/issues/299)\n",
+ "
Last Verified to Run: **2018-11-26**\n",
+ "\n",
+ "The goal of this notebook is to have a QA look a the dc2-data-access produced object_catalog (currently on Run1.2p) on the colors side\n",
+ "It also illustrates how simple and efficient Spark is.\n"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "## Read the data"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 4,
+ "metadata": {},
+ "outputs": [
+ {
+ "name": "stdout",
+ "output_type": "stream",
+ "text": [
+ "root\n",
+ " |-- magerr_g: double (nullable = true)\n",
+ " |-- I_flag_g: boolean (nullable = true)\n",
+ " |-- tract: long (nullable = true)\n",
+ " |-- IxyPSF_y: double (nullable = true)\n",
+ " |-- I_flag_y: boolean (nullable = true)\n",
+ " |-- Ixx_i: double (nullable = true)\n",
+ " |-- dec: double (nullable = true)\n",
+ " |-- magerr_u: double (nullable = true)\n",
+ " |-- mag_z_cModel: double (nullable = true)\n",
+ " |-- mag_u: double (nullable = true)\n",
+ " |-- snr_g_cModel: double (nullable = true)\n",
+ " |-- Ixx: double (nullable = true)\n",
+ " |-- mag_u_cModel: double (nullable = true)\n",
+ " |-- magerr_y: double (nullable = true)\n",
+ " |-- magerr_r_cModel: double (nullable = true)\n",
+ " |-- IxyPSF_r: double (nullable = true)\n",
+ " |-- psFlux_y: double (nullable = true)\n",
+ " |-- magerr_y_cModel: double (nullable = true)\n",
+ " |-- mag_g_cModel: double (nullable = true)\n",
+ " |-- Iyy_i: double (nullable = true)\n",
+ " |-- I_flag_z: boolean (nullable = true)\n",
+ " |-- snr_u_cModel: double (nullable = true)\n",
+ " |-- Ixy_z: double (nullable = true)\n",
+ " |-- psFluxErr_r: double (nullable = true)\n",
+ " |-- mag_i_cModel: double (nullable = true)\n",
+ " |-- Ixy: double (nullable = true)\n",
+ " |-- psf_fwhm_g: double (nullable = true)\n",
+ " |-- x: double (nullable = true)\n",
+ " |-- psFlux_z: double (nullable = true)\n",
+ " |-- Ixy_u: double (nullable = true)\n",
+ " |-- psNdata: float (nullable = true)\n",
+ " |-- snr_i_cModel: double (nullable = true)\n",
+ " |-- IyyPSF_z: double (nullable = true)\n",
+ " |-- Iyy_u: double (nullable = true)\n",
+ " |-- mag_i: double (nullable = true)\n",
+ " |-- parentObjectId: long (nullable = true)\n",
+ " |-- IxxPSF_y: double (nullable = true)\n",
+ " |-- IyyPSF_r: double (nullable = true)\n",
+ " |-- psFluxErr_i: double (nullable = true)\n",
+ " |-- I_flag: boolean (nullable = true)\n",
+ " |-- mag_y_cModel: double (nullable = true)\n",
+ " |-- IyyPSF_u: double (nullable = true)\n",
+ " |-- I_flag_u: boolean (nullable = true)\n",
+ " |-- magerr_u_cModel: double (nullable = true)\n",
+ " |-- xErr: float (nullable = true)\n",
+ " |-- IxxPSF_u: double (nullable = true)\n",
+ " |-- IxxPSF_r: double (nullable = true)\n",
+ " |-- I_flag_r: boolean (nullable = true)\n",
+ " |-- psFlux_flag_i: boolean (nullable = true)\n",
+ " |-- psf_fwhm_i: double (nullable = true)\n",
+ " |-- IxxPSF: double (nullable = true)\n",
+ " |-- psf_fwhm_z: double (nullable = true)\n",
+ " |-- y: double (nullable = true)\n",
+ " |-- Iyy: double (nullable = true)\n",
+ " |-- magerr_i_cModel: double (nullable = true)\n",
+ " |-- IyyPSF_y: double (nullable = true)\n",
+ " |-- psFluxErr_g: double (nullable = true)\n",
+ " |-- psFlux_flag_y: boolean (nullable = true)\n",
+ " |-- magerr_i: double (nullable = true)\n",
+ " |-- psFlux_flag_z: boolean (nullable = true)\n",
+ " |-- I_flag_i: boolean (nullable = true)\n",
+ " |-- snr_z_cModel: double (nullable = true)\n",
+ " |-- Ixy_g: double (nullable = true)\n",
+ " |-- psFlux_r: double (nullable = true)\n",
+ " |-- psf_fwhm_r: double (nullable = true)\n",
+ " |-- magerr_g_cModel: double (nullable = true)\n",
+ " |-- IyyPSF_g: double (nullable = true)\n",
+ " |-- Ixx_z: double (nullable = true)\n",
+ " |-- IxyPSF: double (nullable = true)\n",
+ " |-- psFlux_flag_r: boolean (nullable = true)\n",
+ " |-- objectId: long (nullable = true)\n",
+ " |-- Ixx_u: double (nullable = true)\n",
+ " |-- magerr_z_cModel: double (nullable = true)\n",
+ " |-- patch: string (nullable = true)\n",
+ " |-- IxyPSF_i: double (nullable = true)\n",
+ " |-- IxyPSF_u: double (nullable = true)\n",
+ " |-- IxyPSF_z: double (nullable = true)\n",
+ " |-- mag_r_cModel: double (nullable = true)\n",
+ " |-- magerr_r: double (nullable = true)\n",
+ " |-- Ixx_r: double (nullable = true)\n",
+ " |-- IxxPSF_z: double (nullable = true)\n",
+ " |-- Ixy_i: double (nullable = true)\n",
+ " |-- psf_fwhm_y: double (nullable = true)\n",
+ " |-- Iyy_g: double (nullable = true)\n",
+ " |-- Iyy_y: double (nullable = true)\n",
+ " |-- IxxPSF_i: double (nullable = true)\n",
+ " |-- mag_z: double (nullable = true)\n",
+ " |-- psFluxErr_u: double (nullable = true)\n",
+ " |-- psFlux_i: double (nullable = true)\n",
+ " |-- IyyPSF: double (nullable = true)\n",
+ " |-- clean: boolean (nullable = true)\n",
+ " |-- psFlux_u: double (nullable = true)\n",
+ " |-- mag_y: double (nullable = true)\n",
+ " |-- Ixx_y: double (nullable = true)\n",
+ " |-- blendedness: double (nullable = true)\n",
+ " |-- Iyy_r: double (nullable = true)\n",
+ " |-- mag_g: double (nullable = true)\n",
+ " |-- snr_y_cModel: double (nullable = true)\n",
+ " |-- xy_flag: boolean (nullable = true)\n",
+ " |-- magerr_z: double (nullable = true)\n",
+ " |-- psFluxErr_z: double (nullable = true)\n",
+ " |-- Ixx_g: double (nullable = true)\n",
+ " |-- ra: double (nullable = true)\n",
+ " |-- IyyPSF_i: double (nullable = true)\n",
+ " |-- psFlux_flag_u: boolean (nullable = true)\n",
+ " |-- good: boolean (nullable = true)\n",
+ " |-- IxxPSF_g: double (nullable = true)\n",
+ " |-- Ixy_y: double (nullable = true)\n",
+ " |-- psf_fwhm_u: double (nullable = true)\n",
+ " |-- yErr: float (nullable = true)\n",
+ " |-- mag_r: double (nullable = true)\n",
+ " |-- IxyPSF_g: double (nullable = true)\n",
+ " |-- psFluxErr_y: double (nullable = true)\n",
+ " |-- psFlux_g: double (nullable = true)\n",
+ " |-- snr_r_cModel: double (nullable = true)\n",
+ " |-- Ixy_r: double (nullable = true)\n",
+ " |-- extendedness: double (nullable = true)\n",
+ " |-- psFlux_flag_g: boolean (nullable = true)\n",
+ " |-- Iyy_z: double (nullable = true)\n",
+ "\n"
+ ]
+ }
+ ],
+ "source": [
+ "from pyspark.sql import SparkSession\n",
+ "\n",
+ "# Initialise our Spark session\n",
+ "spark = SparkSession.builder.getOrCreate()\n",
+ "#read the parquet Run1.2p catalog\n",
+ "object_catalog=\"/global/cscratch1/sd/plaszczy/Run1.2p/object_catalog/full_catalog.parquet\"\n",
+ "df_all=spark.read.parquet(object_catalog)\n",
+ "df_all.printSchema()"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "select relevant columns concerning colors (+some goodies). Add a helapix index column"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 5,
+ "metadata": {},
+ "outputs": [
+ {
+ "name": "stdout",
+ "output_type": "stream",
+ "text": [
+ "tract,patch,ra,dec,good,clean,psFlux_flag_u,psFlux_u,mag_u,mag_u_cModel,snr_u_cModel,psFlux_flag_g,psFlux_g,mag_g,mag_g_cModel,snr_g_cModel,psFlux_flag_r,psFlux_r,mag_r,mag_r_cModel,snr_r_cModel,psFlux_flag_i,psFlux_i,mag_i,mag_i_cModel,snr_i_cModel,psFlux_flag_z,psFlux_z,mag_z,mag_z_cModel,snr_z_cModel\n",
+ "27083536\n"
+ ]
+ }
+ ],
+ "source": [
+ "filters=['u','g','r','i','z']\n",
+ "\n",
+ "# build selection by appending to string\n",
+ "cols=\"tract,patch,ra,dec,good,clean\"\n",
+ "for f in filters:\n",
+ " s=\",psFlux_flag_{0},psFlux_{0},mag_{0},mag_{0}_cModel,snr_{0}_cModel\".format(f)\n",
+ " cols+=s\n",
+ "print(cols)\n",
+ "\n",
+ "#use these columns\n",
+ "df=df_all.select(cols.split(','))\n",
+ "\n",
+ "#it will be usefull to add a column of healpix indices\n",
+ "import pandas as pd\n",
+ "import numpy as np\n",
+ "import healpy as hp\n",
+ "from pyspark.sql.functions import pandas_udf, PandasUDFType\n",
+ "nside=1024\n",
+ "#create the ang2pix user-defined-function. \n",
+ "#we use pandas_udf because they are more efficient\n",
+ "@pandas_udf('int', PandasUDFType.SCALAR)\n",
+ "def Ang2Pix(ra,dec):\n",
+ " return pd.Series(hp.ang2pix(nside,np.radians(90-dec),np.radians(ra)))\n",
+ "#add the column\n",
+ "df=df.withColumn(\"ipix\",Ang2Pix(\"ra\",\"dec\"))\n",
+ "\n",
+ "# put in cache (use count since it is an action)\n",
+ "N=df.cache().count()\n",
+ "print(N)"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "## statistics per filter"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 8,
+ "metadata": {},
+ "outputs": [
+ {
+ "name": "stdout",
+ "output_type": "stream",
+ "text": [
+ "u 1500076 N(OK)=683525 frac=0.46\n",
+ "g 1274607 N(OK)=687842 frac=0.54\n",
+ "r 898424 N(OK)=696089 frac=0.77\n",
+ "i 937069 N(OK)=695138 frac=0.74\n",
+ "z 891525 N(OK)=695884 frac=0.78\n"
+ ]
+ }
+ ],
+ "source": [
+ "for f in ['u','g','r','i','z']:\n",
+ " dff=df.select(\"psFlux_flag_\"+f,\"psFlux_\"+f).filter(df['psFlux_flag_'+f]==True)\n",
+ " N=dff.count()\n",
+ " Na=dff.na.drop().count()\n",
+ " print(\"{} {:8d} N(OK)={:6d} frac={:.2f}\".format(f,N,Na,float(Na)/N))"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "same with magnitudes (keeping same flag)"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 11,
+ "metadata": {},
+ "outputs": [
+ {
+ "name": "stdout",
+ "output_type": "stream",
+ "text": [
+ "u 433267 N(OK)=433267 frac=1.00\n",
+ "g 492428 N(OK)=492428 frac=1.00\n",
+ "r 535617 N(OK)=535617 frac=1.00\n",
+ "i 539843 N(OK)=539843 frac=1.00\n",
+ "z 530983 N(OK)=530983 frac=1.00\n"
+ ]
+ }
+ ],
+ "source": [
+ "for f in ['u','g','r','i','z']:\n",
+ " dff=df.select(\"psFlux_flag_\"+f,\"psFlux_\"+f).filter((df['psFlux_flag_'+f]==True)&(df['psFlux_'+f]>0))\n",
+ " N=dff.count()\n",
+ " Na=dff.na.drop().count()\n",
+ " print(\"{} {:8d} N(OK)={:6d} frac={:.2f}\".format(f,N,Na,float(Na)/N))"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "same after fit"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 12,
+ "metadata": {},
+ "outputs": [
+ {
+ "name": "stdout",
+ "output_type": "stream",
+ "text": [
+ "u 1500076 N(OK)=420145 frac=0.28\n",
+ "g 1274607 N(OK)=483795 frac=0.38\n",
+ "r 898424 N(OK)=523131 frac=0.58\n",
+ "i 937069 N(OK)=527497 frac=0.56\n",
+ "z 891525 N(OK)=521904 frac=0.59\n"
+ ]
+ }
+ ],
+ "source": [
+ "for f in ['u','g','r','i','z']:\n",
+ " dff=df.select(\"psFlux_flag_\"+f,\"mag_\"+f+\"_cModel\",\"snr_\"+f+\"_cModel\").filter(df['psFlux_flag_'+f]==True)\n",
+ " N=dff.count()\n",
+ " Na=dff.na.drop().count()\n",
+ " print(\"{} {:8d} N(OK)={:6d} frac={:.2f}\".format(f,N,Na,float(Na)/N))"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "let's apply some quality cuts"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 13,
+ "metadata": {},
+ "outputs": [
+ {
+ "name": "stdout",
+ "output_type": "stream",
+ "text": [
+ "u 1476298 N(OK)=414500 frac=0.28\n",
+ "g 1252905 N(OK)=477701 frac=0.38\n",
+ "r 881003 N(OK)=516434 frac=0.59\n",
+ "i 919244 N(OK)=520803 frac=0.57\n",
+ "z 874421 N(OK)=515294 frac=0.59\n"
+ ]
+ }
+ ],
+ "source": [
+ "for f in ['u','g','r','i','z']:\n",
+ " dff=df.select(\"psFlux_flag_\"+f,\"mag_\"+f+\"_cModel\",\"snr_\"+f+\"_cModel\").\\\n",
+ " filter((df['psFlux_flag_'+f]==True) & (df['good']==True) & (df['clean']==True))\n",
+ " N=dff.count()\n",
+ " Na=dff.na.drop().count()\n",
+ " print(\"{} {:8d} N(OK)={:6d} frac={:.2f}\".format(f,N,Na,float(Na)/N))"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "## maps per filter"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "amazing function to get (approximately) tract borders from the data (only superior minds can understand is beauty)"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 19,
+ "metadata": {},
+ "outputs": [
+ {
+ "name": "stdout",
+ "output_type": "stream",
+ "text": [
+ "10645\n",
+ "4848-4636-4639-5065-5064-4637-4432-5063-4640-4433-4429-5066-4430-4638-4431-4851-4849-4852-5062-4850-"
+ ]
+ }
+ ],
+ "source": [
+ "#list of tracts\n",
+ "tracts=df.select(\"tract\").distinct().toPandas()\n",
+ "df_withpix=df.groupBy([\"tract\",\"ipix\"]).count().drop('count')\n",
+ "print(df_withpix.cache().count())\n",
+ "ipix=np.arange(hp.nside2npix(nside))\n",
+ "pixborder=[]\n",
+ "for t in tracts['tract'].values :\n",
+ " print(t,end='-')\n",
+ " #create a map just for this tract\n",
+ " df_map=df_withpix.filter(df_withpix.tract==int(t))\n",
+ " #create the healpix map\n",
+ " tract_p=df_map.toPandas()\n",
+ " tract_map = np.full(hp.nside2npix(nside),hp.UNSEEN)\n",
+ " tract_map[tract_p['ipix'].values]=1\n",
+ " # for lit pixels compute the neighbours\n",
+ " ipix1=tract_p['ipix'].values\n",
+ " theta,phi=hp.pix2ang(nside,ipix1)\n",
+ " neighbours=hp.get_all_neighbours(nside,theta,phi,0).transpose()\n",
+ " # border if at least one neighbours is UNSEEN\n",
+ " mask=[(hp.UNSEEN in tract_map[neighbours[pixel]]) for pixel in range(len(ipix1))]\n",
+ " pixborder+=list(ipix1[mask])"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "map for each filter (at least where flag non false and not NaN)"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 24,
+ "metadata": {},
+ "outputs": [
+ {
+ "name": "stdout",
+ "output_type": "stream",
+ "text": [
+ "u 46.53281065406508 21.6152811401939\n",
+ "g 52.46409546132538 23.692179764865628\n",
+ "r 56.339223729883244 24.028730006252143\n",
+ "i 56.879464756084715 23.916217713593976\n",
+ "z 55.863545502367174 24.12504692541923\n"
+ ]
+ },
+ {
+ "data": {
+ "text/plain": [
+ "