diff --git a/docs/examples/rrlyr-period.ipynb b/docs/examples/rrlyr-period.ipynb index 2dfa0089..d55fa80b 100644 --- a/docs/examples/rrlyr-period.ipynb +++ b/docs/examples/rrlyr-period.ipynb @@ -66,7 +66,7 @@ "periodogram = Periodogram(peaks=1, nyquist=0.1, max_freq_factor=10, fast=False)\n", "\n", "# Use r band only\n", - "df = ens.batch(periodogram, band_to_calc='r')\n", + "df = ens.batch(periodogram, band_to_calc='r').compute()\n", "display(df)\n", "\n", "# Find RR Lyr with the most confient period\n", @@ -121,13 +121,13 @@ "language_info": { "codemirror_mode": { "name": "ipython", - "version": 2 + "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", - "pygments_lexer": "ipython2", + "pygments_lexer": "ipython3", "version": "3.10.11" }, "vscode": { diff --git a/docs/gettingstarted/quickstart.ipynb b/docs/gettingstarted/quickstart.ipynb index 110442df..580dc39c 100644 --- a/docs/gettingstarted/quickstart.ipynb +++ b/docs/gettingstarted/quickstart.ipynb @@ -108,7 +108,7 @@ "source": [ "from tape.analysis import calc_sf2\n", "result = ens.batch(calc_sf2, sf_method=\"macleod_2012\") # The batch function applies the provided function to all individual lightcurves within the Ensemble\n", - "result" + "result.compute()" ] }, { diff --git a/docs/tutorials.rst b/docs/tutorials.rst index 8cb1c6cf..d97a276a 100644 --- a/docs/tutorials.rst +++ b/docs/tutorials.rst @@ -11,6 +11,7 @@ functionality. Scaling to Large Data Volume Working with Structure Function Binning Sources in the Ensemble + Batch Function Showcase Structure Function Showcase Loading Data into the Ensemble Using Ray with the Ensemble diff --git a/docs/tutorials/batch_showcase.ipynb b/docs/tutorials/batch_showcase.ipynb new file mode 100644 index 00000000..a76631d4 --- /dev/null +++ b/docs/tutorials/batch_showcase.ipynb @@ -0,0 +1,578 @@ +{ + "cells": [ + { + "attachments": {}, + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# Ensemble Batch Showcase\n", + "\n", + "`Ensemble.batch` is a versatile function that allows users to pass in external functions that operate on groupings of `Ensemble` data, most commonly these are functions that calculate something per lightcurve. Because external functions can have a huge variety of inputs and outputs, this notebook serves as a collection of example functions and how `batch` can be used with them. The hope is that there is a function here similar to a function that you are trying to apply via `batch` so that example can be used as a template for getting your function to work." + ] + }, + { + "attachments": {}, + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Generate some toy data and create an Ensemble" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "from tape import Ensemble, ColumnMapper, TapeFrame\n", + "import numpy as np\n", + "import pandas as pd\n", + "import sys" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Generate some fake data\n", + "\n", + "np.random.seed(1)\n", + "\n", + "obj_ids = []\n", + "mjds = []\n", + "for i in range(10,110):\n", + " obj_ids.append(np.array([i]*1250))\n", + " mjds.append(np.arange(0.,1250.,1.))\n", + "obj_ids = np.concatenate(obj_ids)\n", + "mjds = np.concatenate(mjds)\n", + "\n", + "flux = 10*np.random.random(125000)\n", + "err = flux/10\n", + "band = np.random.choice(['g','r'], 125000)\n", + "\n", + "source_dict = {\"id\":obj_ids, \"mjd\":mjds,\"flux\":flux,\"err\":err,\"band\":band}" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Load the data into an Ensemble\n", + "ens = Ensemble()\n", + "\n", + "ens.from_source_dict(source_dict, column_mapper = ColumnMapper(id_col=\"id\",\n", + " time_col=\"mjd\",\n", + " flux_col=\"flux\",\n", + " err_col=\"err\",\n", + " band_col=\"band\"))" + ] + }, + { + "attachments": {}, + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Case 1: A Simple Mean" + ] + }, + { + "attachments": {}, + "cell_type": "markdown", + "metadata": {}, + "source": [ + "We define a simple function that takes in an array-like argument, `flux`, and returns it's mean." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Case 1: Simple\n", + "def my_mean(flux):\n", + " return np.mean(flux)\n", + "\n", + "my_mean([1,2,3,4,5])" + ] + }, + { + "attachments": {}, + "cell_type": "markdown", + "metadata": {}, + "source": [ + "To run the `my_mean` function with `Ensemble.batch`, we simply pass the function, and the argument(s) as separate function arguments. In this case, we pass \"flux\" as a string, as batch will grab the data at that column label to evaluate on." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Default batch\n", + "res1 = ens.batch(my_mean, \"flux\") # \"flux\" is provided to have TAPE pass the \"flux\" column data along to my_mean\n", + "res1.compute() # Compute to see the result" + ] + }, + { + "attachments": {}, + "cell_type": "markdown", + "metadata": {}, + "source": [ + "By default, `Ensemble.batch` groups each lightcurve together (grouping on the specified id column). However, batch also support custom grouping assignments, as below we instead pass `on=[\"band\"]`, letting batch know to calculate the mean for all data from each band." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Batch with custom grouping\n", + "\n", + "res2 = ens.batch(my_mean, \"flux\", on=[\"band\"])\n", + "res2.compute()" + ] + }, + { + "attachments": {}, + "cell_type": "markdown", + "metadata": {}, + "source": [ + "This can be extended to more than just a single column, as below we group by id and then sub-group by band. In `Pandas`, an operation like this would return a multi-index, but due to `Dask` not supporting multi-indexes we return sub-groupings as columns." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Multi-level groupbys\n", + "\n", + "res3 = ens.batch(my_mean, \"flux\", on=[\"id\", \"band\"])\n", + "res3.compute()" + ] + }, + { + "attachments": {}, + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Sub-grouping by photometric band is a use case we expect to be common in TAPE workflows, and so there is the `by_band` kwarg available within batch. This will ensure that the last sub-grouping level is on band and will return independent columns for each band result." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Batch with the by_band flag\n", + "res4 = ens.batch(my_mean, \"flux\", by_band=True)\n", + "res4.compute()" + ] + }, + { + "attachments": {}, + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Case 2: Functions That Return a Series" + ] + }, + { + "attachments": {}, + "cell_type": "markdown", + "metadata": {}, + "source": [ + "In case 2, we write a function that returns a `Pandas.Series` object. This object has the min and max of the flux array stored at different indices of the output series." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "def my_bounds(flux):\n", + " return pd.Series({\"min\":np.min(flux), \"max\":np.max(flux)})\n", + "\n", + "# Function output\n", + "my_bounds([1,2,3,4,5])" + ] + }, + { + "attachments": {}, + "cell_type": "markdown", + "metadata": {}, + "source": [ + "As in case 1, we're able to pass this function and the \"flux\" column along to run the function. However, this time we need the `meta` to be set. The `meta` is a needed component of `Dask's` lazy evaluation. As `Dask` does not actually compute results until requested to, `meta` serves as the expected form of the output. In this case, we just need to let `Dask` know that a min and max column will be present in a dataframe (TAPE will always return a dataframe) and that both will be float values.\n", + "\n", + "For more information on the `Dask` meta argument, read their [documentation](https://blog.dask.org/2022/08/09/understanding-meta-keyword-argument)." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Default Batch\n", + "\n", + "res1 = ens.batch(my_bounds, \"flux\", meta={\"min\":float, \"max\":float}) # Requires meta to be set\n", + "res1.compute()" + ] + }, + { + "attachments": {}, + "cell_type": "markdown", + "metadata": {}, + "source": [ + "The same flexibility with grouping extends to case 2, with again needing to specify the `meta`. Note that the meta given to `Ensemble.batch` remains the same, only depending on the function output, it handles the meta for any columns generated by the grouping on it's own." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Multi-level groupbys, note that meta does not need to change\n", + "res2 = ens.batch(my_bounds, \"flux\", on=[\"id\", \"band\"], meta={\"min\":float, \"max\":float}) # Requires meta to be set\n", + "res2.compute()" + ] + }, + { + "attachments": {}, + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Using the `by_band` kwarg extends the output columns to be per-band." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Using by_band\n", + "\n", + "res3 = ens.batch(my_bounds, \"flux\", by_band=True, meta={\"min\":float, \"max\":float}) # Requires meta to be set\n", + "res3.compute()" + ] + }, + { + "attachments": {}, + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Case 3: Functions That Return a DataFrame" + ] + }, + { + "attachments": {}, + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Here we define a function, `my_bounds_df` that computes the same quantities as `my_bounds` above, but in this case we return a dataframe of the results." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "def my_bounds_df(flux):\n", + " return pd.DataFrame({'min':[np.min(flux)], 'max':[np.max(flux)]})\n", + "\n", + "my_bounds_df([1,2,3,4,5])" + ] + }, + { + "attachments": {}, + "cell_type": "markdown", + "metadata": {}, + "source": [ + "This is perfectly reasonable, but when passing a function like this through `batch` there's an issue currently to watch out for." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Default Batch, some things to watch out for\n", + "\n", + "res1 = ens.batch(my_bounds_df, \"flux\", meta={'min':float, 'max':float})\n", + "res1.compute()\n" + ] + }, + { + "attachments": {}, + "cell_type": "markdown", + "metadata": {}, + "source": [ + "As with the series, we needed to pass the `meta` kwarg letting TAPE know which output columns to expect from the function. However,\n", + "we see that our result is carrying over the index generated by the dataframe in addition to the batch index, represented as a multi-index. At the time of this notebooks creation, `Dask` does not have explicit support for multi-indexes. We can see this problem in the following cells." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Pandas resolves these indexes as a multi-index\n", + "res1.reset_index().compute()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Dask assumes there's just a single index column being sent to the dataframe columns\n", + "res1.reset_index()" + ] + }, + { + "attachments": {}, + "cell_type": "markdown", + "metadata": {}, + "source": [ + "When `Dask` and the underlying `Pandas` disagree on what the dataframe looks like, this causes issues with you as the user being able to work with the dataframe. As `Dask` won't recognize any calls to \"id\" or \"level_1\" here, and instead will only accept a call to \"index\" which in turn `Pandas` won't understand. If this is the issue you run into, we recommend trying to modify your function into a non-dataframe output format. However, in the case that this isn't possible, here's a somewhat hacky way to move around it.\n", + "\n", + "We can resolve this by updating the `Dask` meta manually, to re-align `Dask` and `Pandas`." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# If it's not too compute intensive, grabbing the actual dataframe is the easiest way forward\n", + "real_meta_from_result = res1.reset_index().head(0)\n", + "real_meta_from_result" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "\n", + "# otherwise, can generate this ourselves\n", + "real_meta_from_dataframe = TapeFrame(columns=[\"id\",\"level_1\",\"min\",\"max\"])\n", + "real_meta_from_dataframe" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Overwrite the _meta property\n", + "\n", + "res1_noindex = res1.reset_index()\n", + "res1_noindex._meta = real_meta_from_dataframe\n", + "res1_noindex" + ] + }, + { + "attachments": {}, + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Note that in the above, we've reset the index as `Dask` will not support meta that tracks a multi-index. In the case of this function, we gain no information from the \"level_1\" column, and it would be nice to restablish \"id\" as the index, so we close the loop by executing the commands in the next cell." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "res1 = res1_noindex.drop(columns=[\"level_1\"]).set_index(\"id\")\n", + "res1.compute()" + ] + }, + { + "attachments": {}, + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Case 4: Functions that Require Non-Array Inputs" + ] + }, + { + "attachments": {}, + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Let's return to case 1, but this time instead of the list-like `flux` argument, let's say that the function needs to take in a dataframe with a column titled `my_flux`" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Case 4: DataFrame input\n", + "def my_mean_from_df(df):\n", + " return np.mean(df['my_flux'])\n", + "\n", + "df = pd.DataFrame({'my_flux':[1, 2, 3, 4, 5]})\n", + "my_mean_from_df(df)" + ] + }, + { + "attachments": {}, + "cell_type": "markdown", + "metadata": {}, + "source": [ + "In this case, batch won't be able to directly provide inputs to this function, as batch passes along the column data as arrays to the function. However, we can make this function able to be used by batch by wrapping it with another function." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "def mean_wrapper(flux):\n", + " df = pd.DataFrame({'my_flux': flux})\n", + " return my_mean_from_df(df)\n", + "\n", + "# Can pass the wrapper function along to batch\n", + "res1 = ens.batch(mean_wrapper, \"flux\")\n", + "res1.compute()" + ] + }, + { + "attachments": {}, + "cell_type": "markdown", + "metadata": {}, + "source": [ + "This is a really simple case, but highlights that in some cases a wrapper function can be written to serve as a middle man between your function and `batch`, even doing work to sort or filter your data on a per function call basis if not done as a pre-filter step for your Ensemble." + ] + }, + { + "attachments": {}, + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Case 5: TAPE Analysis Functions" + ] + }, + { + "attachments": {}, + "cell_type": "markdown", + "metadata": {}, + "source": [ + "TAPE analysis functions are a special case of input function to `Ensemble.batch`, where normally required information such as the specified column labels to pass to the function and the `meta` are passed along from the function to `Ensemble.batch` internally, meaning you just need to specify the function and any additional kwargs. For this case, let's leverage the [light-curve](https://github.com/light-curve/light-curve-python) package, which implements the extraction of many light curve [features](https://github.com/light-curve/light-curve-python?tab=readme-ov-file#available-features) used in astrophysics. Feature extraction from this package is also supported within TAPE as an analysis function." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Grab two features extraction methods from light-curve\n", + "from light_curve import Periodogram, OtsuSplit" + ] + }, + { + "attachments": {}, + "cell_type": "markdown", + "metadata": {}, + "source": [ + "In the below example, we apply the Lomb-Scargle Periodogram to our `Ensemble` light curves. Again, noting that in this case the `meta` we had to configure above is already handled by TAPE, and the needed timeseries columns are already passed along internally as well." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Find periods using Lomb-Scargle periodogram\n", + "periodogram = Periodogram(peaks=1, nyquist=0.1, max_freq_factor=10, fast=False)\n", + "\n", + "# Use r band only\n", + "res_per = ens.batch(periodogram, band_to_calc='r') # band_to_calc is a kwarg of Periodogram\n", + "res_per.compute()" + ] + }, + { + "attachments": {}, + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Next, we use the `OtsuSplit` function, used to perform automatic thresholding. In this case, we also supply the `by_band` kwarg to get a result per photometric band." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "res_otsu = ens.batch(OtsuSplit(), band_to_calc=None, by_band=True)\n", + "res_otsu.compute()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.10.11" + }, + "vscode": { + "interpreter": { + "hash": "83afbb17b435d9bf8b0d0042367da76f26510da1c5781f0ff6e6c518eab621ec" + } + } + }, + "nbformat": 4, + "nbformat_minor": 2 +} diff --git a/docs/tutorials/scaling_to_large_data.ipynb b/docs/tutorials/scaling_to_large_data.ipynb index 9e38f6d2..6624b7e2 100644 --- a/docs/tutorials/scaling_to_large_data.ipynb +++ b/docs/tutorials/scaling_to_large_data.ipynb @@ -263,7 +263,7 @@ "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", - "version": "3.10.13" + "version": "3.10.6" }, "vscode": { "interpreter": { diff --git a/docs/tutorials/using_ray_with_the_ensemble.ipynb b/docs/tutorials/using_ray_with_the_ensemble.ipynb index b19ca28f..93f1bf8f 100644 --- a/docs/tutorials/using_ray_with_the_ensemble.ipynb +++ b/docs/tutorials/using_ray_with_the_ensemble.ipynb @@ -151,7 +151,7 @@ "ens = Ensemble()\n", "ens.from_dataset(\"s82_qso\")\n", "ens.source = ens.source.repartition(npartitions=10)\n", - "ens.batch(calc_sf2, use_map=False)" + "ens.batch(calc_sf2, use_map=False).compute()" ] } ], @@ -171,7 +171,7 @@ "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", - "version": "3.10.13" + "version": "3.10.11" }, "vscode": { "interpreter": { diff --git a/docs/tutorials/working_with_the_ensemble.ipynb b/docs/tutorials/working_with_the_ensemble.ipynb index 5908fa58..1e43d3de 100644 --- a/docs/tutorials/working_with_the_ensemble.ipynb +++ b/docs/tutorials/working_with_the_ensemble.ipynb @@ -552,7 +552,7 @@ "# using tape analysis functions\n", "from tape.analysis import calc_stetson_J\n", "\n", - "res = ens.batch(calc_stetson_J, compute=True) # compute is set to true to execute immediately (non-lazily)\n", + "res = ens.batch(calc_stetson_J)\n", "res" ] }, @@ -598,7 +598,7 @@ "metadata": {}, "outputs": [], "source": [ - "res = ens.batch(calc_stetson_J, compute=True, label=\"stetson_j\")\n", + "res = ens.batch(calc_stetson_J, label=\"stetson_j\")\n", "\n", "ens.select_frame(\"stetson_j\").compute()" ] @@ -708,7 +708,7 @@ "print(\"Updated object table is now dirty: \" + str(ens.object.is_dirty()))\n", "\n", "print(\"Length of the Source table before the batch operation: \" + str(len(ens.source)))\n", - "res = ens.batch(calc_stetson_J, compute=True)\n", + "res = ens.batch(calc_stetson_J).compute()\n", "print(\"Post-computation object table is now dirty: \" + str(ens.object.is_dirty()))\n", "print(\"Length of the Source table after the batch operation: \" + str(len(ens.source)))\n", "res" @@ -751,8 +751,8 @@ "\n", "extractor = licu.Extractor(licu.Amplitude(), licu.AndersonDarlingNormal(), licu.StetsonK())\n", "# band_to_calc=None will ignore the band column and use all sources for each object\n", - "res = ens.batch(extractor, compute=True, band_to_calc=\"g\")\n", - "res" + "res = ens.batch(extractor, band_to_calc=\"g\")\n", + "res.compute()" ] }, { @@ -814,8 +814,8 @@ "outputs": [], "source": [ "# Applying the function to the ensemble\n", - "res = ens.batch(my_flux_average, \"flux\", \"band\", compute=True, meta=None, method=\"median\")\n", - "res" + "res = ens.batch(my_flux_average, \"flux\", \"band\", meta=None, method=\"median\")\n", + "res.compute()" ] }, { @@ -849,7 +849,7 @@ ], "metadata": { "kernelspec": { - "display_name": "py310", + "display_name": "Python 3", "language": "python", "name": "python3" }, @@ -863,11 +863,11 @@ "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", - "version": "3.10.13" + "version": "3.10.11" }, "vscode": { "interpreter": { - "hash": "08968836a6367873274ed1d5e98a07391f42fc3a62bd5aba54afbd7b11ba8673" + "hash": "83afbb17b435d9bf8b0d0042367da76f26510da1c5781f0ff6e6c518eab621ec" } } }, diff --git a/pyproject.toml b/pyproject.toml index 3e7021bc..a81dd19f 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -16,7 +16,7 @@ classifiers = [ dynamic=["version"] dependencies = [ 'pandas', - 'numpy<=1.23.5', + 'numpy', 'dask>=2023.6.1', 'dask[distributed]', 'pyarrow', diff --git a/src/tape/ensemble.py b/src/tape/ensemble.py index f10935b1..056e8813 100644 --- a/src/tape/ensemble.py +++ b/src/tape/ensemble.py @@ -1008,7 +1008,7 @@ def bin_sources( self.source.set_dirty(True) return self - def batch(self, func, *args, meta=None, use_map=True, compute=True, on=None, sort_by_time=True, label="", **kwargs): + def batch(self, func, *args, meta=None, by_band=False, use_map=True, on=None, sort_by_time=True, label="", **kwargs): """Run a function from tape.TimeSeries on the available ids Parameters @@ -1032,6 +1032,17 @@ def batch(self, func, *args, meta=None, use_map=True, compute=True, on=None, sor the results. Overridden by TAPE for TAPE and `light-curve` functions. If none, attempts to coerce the result to a pandas.Series. + by_band: `boolean`, optional + If true, the lightcurves are split into separate inputs for each + band and passed along to the function individually. If the band + column is already specified in `on` then `batch` will ensure the + band column is the final element in `on`. For all original columns + outputted by `func`, by_band will generate a set of new columns per + band (for example, a function with output column "result" will + instead have "result_g" and "result_r" as columns if the data had g + and r band data) If False (default), the full lightcurve is passed + along to the function (assuming the band column in not already part + of `on`) use_map : `boolean` Determines whether `dask.dataframe.DataFrame.map_partitions` is used (True). Using map_partitions is generally more efficient, but @@ -1039,13 +1050,11 @@ def batch(self, func, *args, meta=None, use_map=True, compute=True, on=None, sor partition. This can be checked using `Ensemble.check_lightcurve_cohesion`. If False, a groupby will be performed instead. - compute: `boolean` - Determines whether to compute the result immediately or hold for a - later compute call. - on: 'str' or 'list' + on: 'str' or 'list', optional Designates which column(s) to groupby. Columns may be from the - source or object tables. For TAPE and `light-curve` functions - this is populated automatically. + source or object tables. If not specified, then the id column is + used by default. For TAPE and `light-curve` functions this is + populated automatically. label: 'str', optional If provided the ensemble will use this label to track the result dataframe. If not provided, a label of the from "result_{x}" where x @@ -1087,6 +1096,7 @@ def s2n_inter_quartile_range(flux, err): amplitudes = ens.batch(np.ptp, ens._flux_col) ``` """ + self._lazy_sync_tables(table="all") # Convert light-curve package feature into analysis function @@ -1110,6 +1120,14 @@ def s2n_inter_quartile_range(flux, err): if isinstance(on, str): on = [on] # Convert to list if only one column is passed + if by_band: + if self._band_col not in on: + on += [self._band_col] + elif on[-1] != self._band_col: + # Ensure band is the final column in the `on` list + on[on.index(self._band_col)] = on[-1] + on[-1] = self._band_col + # Handle object columns to group on source_cols = list(self.source.columns) object_cols = list(self.object.columns) @@ -1147,11 +1165,11 @@ def convert_and_sort(y, sort, id_col, time_col): if use_map: # use map_partitions id_col = self._id_col # need to grab this before mapping batch = source_to_batch.map_partitions( - lambda x: x.groupby(on, group_keys=False).apply( + lambda x: x.groupby(on, group_keys=True).apply( lambda y: func( *convert_and_sort(y, sort_by_time, id_col, time_col), **kwargs, - ) + ), ), meta=meta, ) @@ -1163,6 +1181,9 @@ def convert_and_sort(y, sort, id_col, time_col): meta=meta, ) + # Output standardization + batch = self._standardize_batch(batch, on, by_band) + # Inherit divisions if known from source and the resulting index is the id # Groupby on index should always return a subset that adheres to the same divisions criteria if self.source.known_divisions and batch.index.name == self._id_col: @@ -1175,10 +1196,64 @@ def convert_and_sort(y, sort, id_col, time_col): # Track the result frame under the provided label self.add_frame(batch, label) - if compute: - return batch.compute() + return batch + + def _standardize_batch(self, batch, on, by_band): + """standardizes the output of a batch result""" + + # Do some up front type checking + if isinstance(batch, EnsembleSeries): + # make sure the output is separated from the id column + if batch.name == self._id_col: + batch = batch.rename("result") + res_cols = [batch.name] # grab the series name to use as a column label + + # convert the series to an EnsembleFrame object + batch = EnsembleFrame.from_dask_dataframe(batch.to_frame()) + + elif isinstance(batch, EnsembleFrame): + # collect output columns + res_cols = list(batch._meta.columns) + else: - return batch + # unclear if there's really a pathway to trigger this, but added for completeness + raise TypeError( + f"The output type of batch ({type(batch)}) does not match any of the expected types: (EnsembleFrame, EnsembleSeries)" + ) + + # Handle formatting for multi-index results + if len(on) > 1: + batch = batch.reset_index() + + # Need to overwrite the meta manually as the multiindex will be + # interpretted by dask as a single "index" column + batch._meta = TapeFrame(columns=on + res_cols) + + # Further reformatting for per-band results + # Pivots on the band column to generate a result column for each + # photometric band. + if by_band: + batch = batch.categorize(self._band_col) + batch = batch.pivot_table(index=on[0], columns=self._band_col, aggfunc="sum") + + # Need to once again reestablish meta for the pivot + band_labels = batch.columns.values + out_cols = [] + # To align with pandas pivot_table results, the columns should be generated in reverse order + for col in res_cols[::-1]: + for band in band_labels: + out_cols += [(str(col), str(band))] + batch._meta = TapeFrame(columns=out_cols) # apply new meta + + # Flatten the columns to a new column per band + batch.columns = ["_".join(col) for col in batch.columns.values] + + # The pivot returns a dask dataframe, need to convert back + batch = EnsembleFrame.from_dask_dataframe(batch) + else: + batch = batch.set_index(on[0], sort=False) + + return batch def from_pandas( self, @@ -1880,7 +1955,7 @@ def _build_index(self, obj_id, band): index = pd.MultiIndex.from_tuples(tuples, names=["object_id", "band", "index"]) return index - def sf2(self, sf_method="basic", argument_container=None, use_map=True, compute=True): + def sf2(self, sf_method="basic", argument_container=None, use_map=True): """Wrapper interface for calling structurefunction2 on the ensemble Parameters @@ -1924,9 +1999,7 @@ def sf2(self, sf_method="basic", argument_container=None, use_map=True, compute= ) else: - result = self.batch( - calc_sf2, use_map=use_map, argument_container=argument_container, compute=compute - ) + result = self.batch(calc_sf2, use_map=use_map, argument_container=argument_container) # Inherit divisions information if known if self.source.known_divisions and self.object.known_divisions: diff --git a/tests/tape_tests/test_ensemble.py b/tests/tape_tests/test_ensemble.py index 4e580fdc..0765b1db 100644 --- a/tests/tape_tests/test_ensemble.py +++ b/tests/tape_tests/test_ensemble.py @@ -318,16 +318,16 @@ def test_from_rrl_dataset(dask_client): # larger dataset, let's just use a subset ens.prune(350) - res = ens.batch(calc_stetson_J) + res = ens.batch(calc_stetson_J).compute() - assert 377927 in res.index # find a specific object + assert 377927 in res.index.values # find a specific object # Check Stetson J results for a specific object - assert res[377927]["g"] == pytest.approx(9.676014, rel=0.001) - assert res[377927]["i"] == pytest.approx(14.22723, rel=0.001) - assert res[377927]["r"] == pytest.approx(6.958200, rel=0.001) - assert res[377927]["u"] == pytest.approx(9.499280, rel=0.001) - assert res[377927]["z"] == pytest.approx(14.03794, rel=0.001) + assert res.loc[377927][0]["g"] == pytest.approx(9.676014, rel=0.001) + assert res.loc[377927][0]["i"] == pytest.approx(14.22723, rel=0.001) + assert res.loc[377927][0]["r"] == pytest.approx(6.958200, rel=0.001) + assert res.loc[377927][0]["u"] == pytest.approx(9.499280, rel=0.001) + assert res.loc[377927][0]["z"] == pytest.approx(14.03794, rel=0.001) def test_from_qso_dataset(dask_client): @@ -341,16 +341,16 @@ def test_from_qso_dataset(dask_client): # larger dataset, let's just use a subset ens.prune(650) - res = ens.batch(calc_stetson_J) + res = ens.batch(calc_stetson_J).compute() - assert 1257836 in res # find a specific object + assert 1257836 in res.index.values # find a specific object # Check Stetson J results for a specific object - assert res.loc[1257836]["g"] == pytest.approx(411.19885, rel=0.001) - assert res.loc[1257836]["i"] == pytest.approx(86.371310, rel=0.001) - assert res.loc[1257836]["r"] == pytest.approx(133.56796, rel=0.001) - assert res.loc[1257836]["u"] == pytest.approx(231.93229, rel=0.001) - assert res.loc[1257836]["z"] == pytest.approx(53.013018, rel=0.001) + assert res.loc[1257836][0]["g"] == pytest.approx(411.19885, rel=0.001) + assert res.loc[1257836][0]["i"] == pytest.approx(86.371310, rel=0.001) + assert res.loc[1257836][0]["r"] == pytest.approx(133.56796, rel=0.001) + assert res.loc[1257836][0]["u"] == pytest.approx(231.93229, rel=0.001) + assert res.loc[1257836][0]["z"] == pytest.approx(53.013018, rel=0.001) def test_read_rrl_dataset(dask_client): @@ -363,16 +363,16 @@ def test_read_rrl_dataset(dask_client): # larger dataset, let's just use a subset ens.prune(350) - res = ens.batch(calc_stetson_J) + res = ens.batch(calc_stetson_J).compute() - assert 377927 in res.index # find a specific object + assert 377927 in res.index.values # find a specific object # Check Stetson J results for a specific object - assert res[377927]["g"] == pytest.approx(9.676014, rel=0.001) - assert res[377927]["i"] == pytest.approx(14.22723, rel=0.001) - assert res[377927]["r"] == pytest.approx(6.958200, rel=0.001) - assert res[377927]["u"] == pytest.approx(9.499280, rel=0.001) - assert res[377927]["z"] == pytest.approx(14.03794, rel=0.001) + assert res.loc[377927][0]["g"] == pytest.approx(9.676014, rel=0.001) + assert res.loc[377927][0]["i"] == pytest.approx(14.22723, rel=0.001) + assert res.loc[377927][0]["r"] == pytest.approx(6.958200, rel=0.001) + assert res.loc[377927][0]["u"] == pytest.approx(9.499280, rel=0.001) + assert res.loc[377927][0]["z"] == pytest.approx(14.03794, rel=0.001) def test_read_qso_dataset(dask_client): @@ -385,16 +385,16 @@ def test_read_qso_dataset(dask_client): # larger dataset, let's just use a subset ens.prune(650) - res = ens.batch(calc_stetson_J) + res = ens.batch(calc_stetson_J).compute() - assert 1257836 in res # find a specific object + assert 1257836 in res.index.values # find a specific object # Check Stetson J results for a specific object - assert res.loc[1257836]["g"] == pytest.approx(411.19885, rel=0.001) - assert res.loc[1257836]["i"] == pytest.approx(86.371310, rel=0.001) - assert res.loc[1257836]["r"] == pytest.approx(133.56796, rel=0.001) - assert res.loc[1257836]["u"] == pytest.approx(231.93229, rel=0.001) - assert res.loc[1257836]["z"] == pytest.approx(53.013018, rel=0.001) + assert res.loc[1257836][0]["g"] == pytest.approx(411.19885, rel=0.001) + assert res.loc[1257836][0]["i"] == pytest.approx(86.371310, rel=0.001) + assert res.loc[1257836][0]["r"] == pytest.approx(133.56796, rel=0.001) + assert res.loc[1257836][0]["u"] == pytest.approx(231.93229, rel=0.001) + assert res.loc[1257836][0]["z"] == pytest.approx(53.013018, rel=0.001) def test_from_source_dict(dask_client): @@ -1624,13 +1624,15 @@ def test_batch(data_fixture, request, use_map, on): result = ( parquet_ensemble.prune(10) .dropna(table="source") - .batch(calc_stetson_J, use_map=use_map, on=on, band_to_calc=None, compute=False, label="stetson_j") + .batch(calc_stetson_J, use_map=use_map, on=on, band_to_calc=None, label="stetson_j") ) # Validate that the ensemble is now tracking a new result frame. assert len(parquet_ensemble.frames) == frame_cnt + 1 tracked_result = parquet_ensemble.select_frame("stetson_j") - assert isinstance(tracked_result, EnsembleSeries) + + print(tracked_result) + assert isinstance(tracked_result, EnsembleFrame) assert result is tracked_result # Make sure that divisions information is propagated if known @@ -1640,8 +1642,9 @@ def test_batch(data_fixture, request, use_map, on): result = result.compute() if on is None: - assert pytest.approx(result.values[0]["g"], 0.001) == -0.04174282 - assert pytest.approx(result.values[0]["r"], 0.001) == 0.6075282 + print(result.values[0]) + assert pytest.approx(result.values[0][0]["g"], 0.001) == -0.04174282 + assert pytest.approx(result.values[0][0]["r"], 0.001) == 0.6075282 elif on is ["ps1_objid", "filterName"]: # In case where we group on id and band, the structure changes assert pytest.approx(result.values[1]["r"], 0.001) == 0.6075282 assert pytest.approx(result.values[0]["g"], 0.001) == -0.04174282 @@ -1650,6 +1653,75 @@ def test_batch(data_fixture, request, use_map, on): assert pytest.approx(result.values[1]["r"], 0.001) == -0.49639028 +@pytest.mark.parametrize("on", [None, ["ps1_objid", "filterName"], ["filterName", "ps1_objid"]]) +@pytest.mark.parametrize("func_label", ["mean", "bounds"]) +def test_batch_by_band(parquet_ensemble, func_label, on): + """ + Test that ensemble.batch(by_band=True) works as intended. + """ + + if func_label == "mean": + + def my_mean(flux): + """returns a single value""" + return np.mean(flux) + + res = parquet_ensemble.batch(my_mean, parquet_ensemble._flux_col, on=on, by_band=True) + + parquet_ensemble.source.query(f"{parquet_ensemble._band_col}=='g'").update_ensemble() + filter_res = parquet_ensemble.batch(my_mean, parquet_ensemble._flux_col, on=on, by_band=False) + + # An EnsembleFrame should be returned + assert isinstance(res, EnsembleFrame) + + # Make sure we get all the expected columns + assert all([col in res.columns for col in ["result_g", "result_r"]]) + + # These should be equivalent + assert ( + res.loc[88472935274829959]["result_g"] + .compute() + .equals(filter_res.loc[88472935274829959]["result"].compute()) + ) + + elif func_label == "bounds": + + def my_bounds(flux): + """returns a series""" + return pd.Series({"min": np.min(flux), "max": np.max(flux)}) + + res = parquet_ensemble.batch( + my_bounds, "psFlux", on=on, by_band=True, meta={"min": float, "max": float} + ) + + parquet_ensemble.source.query(f"{parquet_ensemble._band_col}=='g'").update_ensemble() + filter_res = parquet_ensemble.batch( + my_bounds, "psFlux", on=on, by_band=False, meta={"min": float, "max": float} + ) + + # An EnsembleFrame should be returned + assert isinstance(res, EnsembleFrame) + + # Make sure we get all the expected columns + assert all([col in res.columns for col in ["max_g", "max_r", "min_g", "min_r"]]) + + # These should be equivalent + assert ( + res.loc[88472935274829959]["max_g"] + .compute() + .equals(filter_res.loc[88472935274829959]["max"].compute()) + ) + assert ( + res.loc[88472935274829959]["min_g"] + .compute() + .equals(filter_res.loc[88472935274829959]["min"].compute()) + ) + + # Meta should reflect the actual columns, this can get out of sync + # whenever multi-indexes are involved, which batch tries to handle + assert all([col in res.columns for col in res.compute().columns]) + + def test_batch_labels(parquet_ensemble): """ Test that ensemble.batch() generates unique labels for result frames when none are provided. @@ -1714,7 +1786,7 @@ def test_batch_with_custom_series_meta(parquet_ensemble, custom_meta): assert len(parquet_ensemble.frames) == num_frames + 1 assert len(parquet_ensemble.select_frame("flux_mean")) > 0 - assert isinstance(parquet_ensemble.select_frame("flux_mean"), EnsembleSeries) + assert isinstance(parquet_ensemble.select_frame("flux_mean"), EnsembleFrame) @pytest.mark.parametrize( @@ -1811,7 +1883,7 @@ def test_sf2(data_fixture, request, method, combine, sthresh, use_map=False): arg_container.bin_count_target = sthresh if not combine: - res_sf2 = parquet_ensemble.sf2(argument_container=arg_container, use_map=use_map, compute=False) + res_sf2 = parquet_ensemble.sf2(argument_container=arg_container, use_map=use_map) else: res_sf2 = parquet_ensemble.sf2(argument_container=arg_container, use_map=use_map) res_batch = parquet_ensemble.batch(calc_sf2, use_map=use_map, argument_container=arg_container) @@ -1821,10 +1893,9 @@ def test_sf2(data_fixture, request, method, combine, sthresh, use_map=False): assert res_sf2.known_divisions if combine: - assert not res_sf2.equals(res_batch) # output should be different + assert not res_sf2.equals(res_batch.compute()) # output should be different else: - res_sf2 = res_sf2.compute() - assert res_sf2.equals(res_batch) # output should be identical + assert res_sf2.compute().equals(res_batch.compute()) # output should be identical @pytest.mark.parametrize("sf_method", ["basic", "macleod_2012", "bauer_2009a", "bauer_2009b", "schmidt_2010"]) @@ -1839,7 +1910,7 @@ def test_sf2_methods(parquet_ensemble, sf_method, use_map=False): arg_container.bin_count_target = 50 arg_container.sf_method = sf_method - res_sf2 = parquet_ensemble.sf2(argument_container=arg_container, use_map=use_map) - res_batch = parquet_ensemble.batch(calc_sf2, use_map=use_map, argument_container=arg_container) + res_sf2 = parquet_ensemble.sf2(argument_container=arg_container, use_map=use_map).compute() + res_batch = parquet_ensemble.batch(calc_sf2, use_map=use_map, argument_container=arg_container).compute() assert res_sf2.equals(res_batch) # output should be identical diff --git a/tests/tape_tests/test_feature_extraction.py b/tests/tape_tests/test_feature_extraction.py index 6fdd394b..73aaa3b5 100644 --- a/tests/tape_tests/test_feature_extraction.py +++ b/tests/tape_tests/test_feature_extraction.py @@ -51,7 +51,7 @@ def test_multiple_features_with_ensemble(dask_client): result = ens.batch( extractor, band_to_calc="g", - ) + ).compute() assert result.shape == (2, 3) assert_array_equal(result.columns, ["anderson_darling_normal", "inter_percentile_range_25", "stetson_K"]) @@ -84,7 +84,7 @@ def test_otsu_with_ensemble_all_bands(dask_client): result = ens.batch( licu.OtsuSplit(), band_to_calc=None, - ) + ).compute() assert result.shape == (2, 4) assert_array_equal(