diff --git a/docs/tutorials.rst b/docs/tutorials.rst index 5c50be9e..60106b2b 100644 --- a/docs/tutorials.rst +++ b/docs/tutorials.rst @@ -10,6 +10,7 @@ functionality. Working with the TAPE Ensemble object Working with HiPSCat and LSDB data Working with the TAPE Timeseries object + Common Data Operations with TAPE Scaling to Large Data Volume Working with Structure Function Using Ray with the Ensemble diff --git a/docs/tutorials/common_data_operations.ipynb b/docs/tutorials/common_data_operations.ipynb new file mode 100644 index 00000000..51e326a7 --- /dev/null +++ b/docs/tutorials/common_data_operations.ipynb @@ -0,0 +1,582 @@ +{ + "cells": [ + { + "attachments": {}, + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# Common Data Operations with `TAPE`" + ] + }, + { + "attachments": {}, + "cell_type": "markdown", + "metadata": {}, + "source": [ + "In this notebook, we'll highlight a handful of common dataframe operations that can be performed within `TAPE`. \n", + "\n", + "> **_Note:_**\n", + "`TAPE` extends the `Pandas`/`Dask` API, and so users familiar with those APIs can expect many operations to be near-identical when working with `TAPE`." + ] + }, + { + "attachments": {}, + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Let's consider a small example dataset of Stripe 82 RRLyrae:" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "from tape import Ensemble\n", + "\n", + "ens = Ensemble()\n", + "\n", + "ens.from_dataset(\"s82_rrlyrae\", sort=True)" + ] + }, + { + "attachments": {}, + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Inspection\n", + "\n", + "These functions provide views into the contents of your `Ensemble` dataframe, especially important when dealing with large data volumes that cannot be brought into memory all at once." + ] + }, + { + "attachments": {}, + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Lazy View of an `EnsembleFrame`\n", + "\n", + "The most basic inspection method is to just call the EnsembleFrame (dataframe) objects themselves. This returns a lazy (no data is loaded) view of the EnsembleFrame." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "ens.object" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "ens.source" + ] + }, + { + "attachments": {}, + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Using `Compute()` to view the data\n", + "\n", + "When an `EnsembleFrame`'s contents are small enough to fit into memory, you can use `compute()` to view the actual data.\n", + "\n", + "> **_Note:_**\n", + "`compute()` also involves actual computation of the in-memory data, working on any loading/filtering/analysis needed to produce the result, as such this can take a long time! " + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "ens.object.compute()" + ] + }, + { + "attachments": {}, + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Grab small in-memory views with `head()`\n", + "\n", + "Often, you'll want to peek at your data even though the full-size is too large for memory.\n", + "\n", + "> **_Note:_**\n", + "By default this only looks at the first partition of data, so any operations that remove all data from the first partition will produce an empty head result. Specify `npartitions=-1` to grab from all partitions.\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "ens.source.head(5, npartitions=-1) # grabs the first 5 rows\n", + "\n", + "# can also use tail to grab the last 5 rows" + ] + }, + { + "attachments": {}, + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Getting Individual Lightcurves\n", + "\n", + "Several methods exist to access individual lightcurves within the `Ensemble`." + ] + }, + { + "attachments": {}, + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Access using a known ID\n", + "\n", + "If you'd like to access a particular lightcurve given an ID, you can use the `to_timeseries()` function. This allows you to supply a given object ID, and returns a `TimeSeries` object (see [working_with_the_timeseries](working_with_the_timeseries.ipynb))." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "ts = ens.to_timeseries(13350)\n", + "ts.data" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "import matplotlib.pyplot as plt\n", + "\n", + "for band in ts.data.band.unique():\n", + " plt.errorbar(\n", + " ts.data.loc[band][\"mjd\"],\n", + " ts.data.loc[band][\"flux\"],\n", + " yerr=ts.data.loc[band][\"error\"],\n", + " fmt=\".\",\n", + " label=band,\n", + " )\n", + "\n", + "plt.ylim(16, 20)\n", + "plt.legend()\n", + "plt.title(ts.meta[\"id\"])" + ] + }, + { + "attachments": {}, + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Access a random lightcurve\n", + "\n", + "Alternatively, if you aren't interested in a particular lightcurve, you can draw a random one from the `Ensemble` using `Ensemble.select_random_timeseries()`." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "ens.select_random_timeseries(seed=1).data" + ] + }, + { + "attachments": {}, + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Filtering\n" + ] + }, + { + "attachments": {}, + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Queries\n", + "Queries mirror the `Pandas` implementation. Specifically, the function takes a string that provides an expression indicating which rows to **keep**." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# define a query to remove the top 5% of flux values\n", + "highest_flux = ens.source[ens._flux_col].quantile(0.95).compute()\n", + "ens.source.query(f\"{ens._flux_col} < {highest_flux}\").compute()" + ] + }, + { + "attachments": {}, + "cell_type": "markdown", + "metadata": {}, + "source": [ + "\n", + "> **_Note:_**\n", + "When filtering, or doing any operations that modify a dataframe, the result is a new dataframe that does not automically update the `Ensemble`. If you'd like to update the `Ensemble` with the result of any of the following operations, be sure to add `.update_ensemble()` to the end of the call." + ] + }, + { + "attachments": {}, + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Filtering by Number of Observations\n", + "\n", + "Filters based on number of observations are more directly supported within the TAPE API. First, using a dedicated function to calculate the number of observations per lightcurve, `Ensemble.calc_nobs()`:" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "ens.calc_nobs(by_band=True)\n", + "\n", + "ens.object[[\"nobs_u\", \"nobs_g\", \"nobs_r\", \"nobs_i\", \"nobs_z\", \"nobs_total\"]].head(5)" + ] + }, + { + "attachments": {}, + "cell_type": "markdown", + "metadata": {}, + "source": [ + "You can then query on these columns as normal." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "ens.object.query(\"nobs_total > 322\")[[\"nobs_u\", \"nobs_g\", \"nobs_r\", \"nobs_i\", \"nobs_z\", \"nobs_total\"]].head(5)" + ] + }, + { + "attachments": {}, + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Alternatively, if you'd like to just quickly filter by the number of total observations, you can use `Ensemble.prune()`." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "ens.prune(322) # equivalent to the above\n", + "ens.object[[\"nobs_total\"]].head(5)" + ] + }, + { + "attachments": {}, + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Removing NaNs\n", + "\n", + "Removing Rows with NaN values follows the `Pandas` API, using `dropna()`:" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Remove any rows with a NaN value in any of the specified columns\n", + "ens.source.dropna(subset=[\"flux\", \"mjd\", \"error\", \"band\"]).update_ensemble()\n", + "ens.source" + ] + }, + { + "attachments": {}, + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Analysis" + ] + }, + { + "attachments": {}, + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Applying Functions with `Ensemble.batch()`\n", + "\n", + "The `Ensemble` provides a powerful batching interface, `Ensemble.batch()`, with in-built parallelization (provided the input data is in multiple partitions)." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "import numpy as np\n", + "\n", + "\n", + "# Defining a simple function\n", + "def my_flux_average(flux_array, band_array, method=\"mean\", band=None):\n", + " \"\"\"Read in an array of fluxes, and return the average of the fluxes by band\"\"\"\n", + " if band != None:\n", + " mask = [band_array == band] # Create a band by band mask\n", + " band_flux = flux_array[tuple(mask)] # Mask the flux array\n", + " if method == \"mean\":\n", + " res = np.mean(band_flux)\n", + " elif method == \"median\":\n", + " res = np.median(band_flux)\n", + " else:\n", + " res = None\n", + " return res" + ] + }, + { + "attachments": {}, + "cell_type": "markdown", + "metadata": {}, + "source": [ + "With the function defined, we next supply it to `Ensemble.batch()`. The column labels of the `Ensemble` columns we want to use as arguments must be provided, as well as any keyword arguments. In this case, we pass along `\"flux\"` and `\"band\"`, so that the `Ensemble` will map those columns to `flux_array` and `band_array` respectively. We also pass `method='median'` and `band='g'`, which will pass those kwargs along to `my_flux_average`." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Applying the function to the ensemble\n", + "res = ens.batch(my_flux_average, \"flux\", \"band\", meta=None, method=\"median\", band=\"g\")\n", + "res.compute()" + ] + }, + { + "attachments": {}, + "cell_type": "markdown", + "metadata": {}, + "source": [ + "`Ensemble.batch()` supports many different variations of custom user functions, and additionally has a small suite of tailored analysis functions designed for it. For more details on batch, see the [batch showcase](batch_showcase.ipynb)." + ] + }, + { + "attachments": {}, + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Column Assignment\n", + "\n", + "The ensemble object supports assignment through the `Pandas` `assign` function. We can pass in either a callable or a series to assign to the new column. New column names are produced automatically from the argument name.\n", + "\n", + "For example, if we want to compute the lower bound of an error range as the estimated flux minus twice the estimated error, we would use:" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "lower_bnd = ens.source.assign(lower_bnd=lambda x: x[\"flux\"] - 2.0 * x[\"error\"])\n", + "lower_bnd.head(5)" + ] + }, + { + "attachments": {}, + "cell_type": "markdown", + "metadata": {}, + "source": [ + "We can also assign our computed batch result as a new object column using the same methodology." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "ens.object.assign(g_average=res[\"result\"])[[\"ra\", \"dec\", \"g_average\"]].head(5)" + ] + }, + { + "attachments": {}, + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Dask Tips\n" + ] + }, + { + "attachments": {}, + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Using `persist()` to Save Computation Time\n", + "\n", + "When calling `compute()`, all work needed to produce the in-memory result is performed. This work is reperformed each time `compute()` is called, leading to the potential to duplicate a lot of computational work, especially in exploratory notebooks where you're testing different workflows. In such cases, it can be advantageous to call `persist()`. \n", + "\n", + "`persist()` returns a lazy view of a result, but actively begins computation of that result behind the scenes, leading to successive calls simply grabbing the result from `persist()` rather than needing to compute the result themselves. As a result, `persist()` should only be used when your data can fit into memory." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "ens.source.persist() # persist performs all queued data loading tasks\n", + "ens.source.compute() # which allows compute to just pull the result immediately." + ] + }, + { + "attachments": {}, + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Repartitioning\n", + "\n", + "With `Dask` and `TAPE` data is stored in separate sub-containers called \"partitions\", [`Dask` has recommendations](https://docs.dask.org/en/stable/best-practices.html#dask-best-practices) for the optimal amount of data stored in a given partition, and even if the initial data follows these recommendations, filtering steps can cause partitions to contain very little data. In this case, it may be best to call `repartition()`." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "ens.source.repartition(partition_size=\"100MB\").update_ensemble() # 100MBs is generally recommended\n", + "ens.source # In this case, we have a small set of data that easily fits into one partition" + ] + }, + { + "attachments": {}, + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Sampling\n", + "\n", + "\n", + "In addition to filtering by specific constraints, it's possible to select a subset of your data to work with. `Ensemble.sample()` will randomly select a fraction of objects from the full object list. This will return a new\n", + "ensemble object to work with." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "subset_ens = ens.sample(frac=0.5) # select ~half of the objects\n", + "\n", + "print(\"Number of pre-sampled objects: \", len(ens.object))\n", + "print(\"Number of post-sampled objects: \", len(subset_ens.object))" + ] + }, + { + "attachments": {}, + "cell_type": "markdown", + "metadata": {}, + "source": [ + "> **_Note:_**\n", + "Using `Ensemble.sample` to filter large datasets is not recommended, as it does not handle repartitioning. Instead, using partition slicing, shown below." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# partition slicing\n", + "\n", + "# specify a subset of partitions, propagates to the object table automatically\n", + "ens.source.partitions[0:1].update_ensemble()" + ] + }, + { + "attachments": {}, + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Saving Intermediate Results\n", + "\n", + "In some situations, you may find yourself running a given workflow many times. Due to the nature of lazy-computation, this will involve repeated execution of data I/O, pre-processing steps, initial analysis, etc. In these situations, it may be effective to instead save the ensemble state to disk after completion of these initial processing steps. To accomplish this, we can use the `Ensemble.save_ensemble()` function." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "ens.save_ensemble(\".\", \"ensemble\", additional_frames=False) # Saves to disk" + ] + }, + { + "attachments": {}, + "cell_type": "markdown", + "metadata": {}, + "source": [ + "The above command creates an \"ensemble\" directory in the current working directory. This directory contains a subdirectory of parquet files for each `EnsembleFrame` object that was included in the `additional_frames` kwarg. Note that if `additional_frames` was set to True or False this would save all or none of the additional `EnsembleFrame` objects respectively, and that the object (unless it has no columns) and source frames are always saved.\n", + "\n", + "From here, we can just load the ensemble from disk." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "new_ens = Ensemble()\n", + "new_ens.from_ensemble(\"./ensemble\")" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.10.11" + }, + "vscode": { + "interpreter": { + "hash": "83afbb17b435d9bf8b0d0042367da76f26510da1c5781f0ff6e6c518eab621ec" + } + } + }, + "nbformat": 4, + "nbformat_minor": 2 +}