diff --git a/docs/_static/tutorial_images/ensemble_api_structure.png b/docs/_static/tutorial_images/ensemble_api_structure.png new file mode 100644 index 00000000..53e24686 Binary files /dev/null and b/docs/_static/tutorial_images/ensemble_api_structure.png differ diff --git a/docs/_static/tutorial_images/ensemble_frame_hierarchy.png b/docs/_static/tutorial_images/ensemble_frame_hierarchy.png new file mode 100644 index 00000000..33a3e5b7 Binary files /dev/null and b/docs/_static/tutorial_images/ensemble_frame_hierarchy.png differ diff --git a/docs/tutorials/batch_showcase.ipynb b/docs/tutorials/batch_showcase.ipynb index f5f9973f..9bb6f5d2 100644 --- a/docs/tutorials/batch_showcase.ipynb +++ b/docs/tutorials/batch_showcase.ipynb @@ -30,11 +30,6 @@ "import sys" ] }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [] - }, { "cell_type": "code", "execution_count": null, @@ -550,12 +545,27 @@ "res_otsu.compute()" ] }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# Using light-curve package features\n", + "`Ensemble.batch()` also supports the use of [light-curve](https://pypi.org/project/light-curve/) package feature extractor:" + ] + }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], - "source": [] + "source": [ + "import light_curve as licu\n", + "\n", + "extractor = licu.Extractor(licu.Amplitude(), licu.AndersonDarlingNormal(), licu.StetsonK())\n", + "# band_to_calc=None will ignore the band column and use all sources for each object\n", + "res = ens.batch(extractor, band_to_calc=\"g\")\n", + "res.compute()" + ] } ], "metadata": { @@ -574,7 +584,7 @@ "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", - "version": "3.10.11" + "version": "3.10.13" }, "vscode": { "interpreter": { diff --git a/docs/tutorials/working_with_the_ensemble.ipynb b/docs/tutorials/working_with_the_ensemble.ipynb index d2969c5d..b56f8961 100644 --- a/docs/tutorials/working_with_the_ensemble.ipynb +++ b/docs/tutorials/working_with_the_ensemble.ipynb @@ -5,7 +5,7 @@ "cell_type": "markdown", "metadata": {}, "source": [ - "# Working with the TAPE `Ensemble` object" + "# Working with the TAPE Ensemble" ] }, { @@ -13,8 +13,9 @@ "cell_type": "markdown", "metadata": {}, "source": [ - "When working with many lightcurves, the TAPE `Ensemble` object serves as a singular interface for storing, filtering, and analyzing timeseries data. \n", - "Let's consider an example set of lightcurves, generated as follows:" + "When working with many lightcurves, the `TAPE` `Ensemble` object serves as a singular interface for storing, filtering, and analyzing timeseries data.\n", + "\n", + "Let's create a small example `Ensemble` from the Stripe 82 RRLyrae dataset." ] }, { @@ -22,118 +23,10 @@ "execution_count": null, "metadata": {}, "outputs": [], - "source": [ - "import numpy as np\n", - "import pandas as pd\n", - "\n", - "np.random.seed(1)\n", - "\n", - "# Generate 10 astronomical objects\n", - "n_obj = 10\n", - "ids = 8000 + np.arange(n_obj)\n", - "names = ids.astype(str)\n", - "object_table = pd.DataFrame(\n", - " {\n", - " \"id\": ids,\n", - " \"name\": names,\n", - " \"ddf_bool\": np.random.randint(0, 2, n_obj), # 0 if from deep drilling field, 1 otherwise\n", - " \"libid_cadence\": np.random.randint(1, 130, n_obj),\n", - " }\n", - ")\n", - "\n", - "# Create 1000 lightcurves with 100 measurements each\n", - "lc_len = 100\n", - "num_points = 1000\n", - "all_bands = np.array([\"r\", \"g\", \"b\", \"i\"])\n", - "source_table = pd.DataFrame(\n", - " {\n", - " \"id\": 8000 + (np.arange(num_points) % n_obj),\n", - " \"time\": np.arange(num_points),\n", - " \"flux\": np.random.random_sample(size=num_points) * 10,\n", - " \"band\": np.repeat(all_bands, num_points / len(all_bands)),\n", - " \"error\": np.random.random_sample(size=num_points),\n", - " \"count\": np.arange(num_points),\n", - " },\n", - ")" - ] - }, - { - "attachments": {}, - "cell_type": "markdown", - "metadata": {}, - "source": [ - "We can load these into the `Ensemble` using `Ensemble.from_pandas()`:" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": { - "ExecuteTime": { - "end_time": "2023-08-30T14:58:36.125402Z", - "start_time": "2023-08-30T14:58:34.190790Z" - } - }, - "outputs": [], "source": [ "from tape.ensemble import Ensemble\n", "\n", - "ens = Ensemble() # initialize an ensemble object\n", - "\n", - "# Read in the generated lightcurve data\n", - "ens.from_pandas(\n", - " source_frame=source_table,\n", - " object_frame=object_table,\n", - " id_col=\"id\",\n", - " time_col=\"time\",\n", - " flux_col=\"flux\",\n", - " err_col=\"error\",\n", - " band_col=\"band\",\n", - " npartitions=1,\n", - " sort=True,\n", - ")" - ] - }, - { - "attachments": {}, - "cell_type": "markdown", - "metadata": {}, - "source": [ - "We now have an `Ensemble` object, and have provided it with the constructed data in the source dictionary. Within the call to `Ensemble.from_pandas`, we specified which columns of the input file mapped to timeseries quantities that the `Ensemble` needs to understand. It's important to link these arguments properly, as the `Ensemble` will use these columns when operations are requested on understood quantities. For example, if a TAPE analysis function requires the time column, from this linking the `Ensemble` will automatically supply that function with the 'time' column." - ] - }, - { - "attachments": {}, - "cell_type": "markdown", - "metadata": {}, - "source": [ - "## Column Mapping with the ColumnMapper\n", - "\n", - "In the above example, we manually provide the column labels within the call to `Ensemble.from_pandas`. Alternatively, the `tape.utils.ColumnMapper` class offers a means to assign the column mappings. Either manually as shown before, or even populated from a known mapping scheme." - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": { - "ExecuteTime": { - "end_time": "2023-08-30T14:58:36.209050Z", - "start_time": "2023-08-30T14:58:36.115521Z" - } - }, - "outputs": [], - "source": [ - "from tape.utils import ColumnMapper\n", - "\n", - "# columns assigned manually\n", - "col_map = ColumnMapper().assign(\n", - " id_col=\"id\", time_col=\"time\", flux_col=\"flux\", err_col=\"error\", band_col=\"band\"\n", - ")\n", - "\n", - "# Pass the ColumnMapper along to from_pandas\n", - "ens.from_pandas(\n", - " source_frame=source_table, object_frame=object_table, column_mapper=col_map, npartitions=1, sort=True\n", - ")" + "ens = Ensemble().from_dataset(\"s82_rrlyrae\", sort=True)" ] }, { @@ -142,164 +35,16 @@ "metadata": {}, "source": [ "## The Object and Source Frames\n", - "The `Ensemble` maintains two dataframes under the hood, the \"object dataframe\" and the \"source dataframe\". This borrows from the Rubin Observatories object-source convention, where object denotes a given astronomical object and source is the collection of measurements of that object. Essentially, the Object frame stores one-off information about objects, and the source frame stores the available time-domain data. As a result, `Ensemble` functions that operate on the underlying dataframes need to be pointed at either object or source. In most cases, the default is the object table as it's a more helpful interface for understanding the contents of the `Ensemble`, especially when dealing with large volumes of data.\n", + "The `Ensemble` is an interface for tracking and manipulating a collection of dataframes. When first intialized, an `Ensemble` tracks two tables (though additonal tables can be added the `Ensemble`), the \"Object dataframe\" and the \"Source dataframe\".\n", "\n", - "We can also access Ensemble frames individually with `Ensemble.source` and `Ensemble.object`" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "## Dask and \"Lazy Evaluation\"\n", - "\n", - "Before going any further, the `Ensemble` is built on top of `Dask`, which brings with it a powerful framework for parallelization and scalability. However, there are some differences in how `Dask` code works that, if you're unfamiliar with it, is worth establishing right here at the get-go. The first is that `Dask` evaluates code \"lazily\". Meaning that many operations are not executed when the line of code is run, but instead are added to a scheduler to be executed when the result is actually needed. See below for an example:" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": { - "ExecuteTime": { - "end_time": "2023-08-30T14:58:36.219081Z", - "start_time": "2023-08-30T14:58:36.205629Z" - } - }, - "outputs": [], - "source": [ - "ens.source # We have not actually loaded any data into memory" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "Here we are accessing the Dask dataframe and despite running a command to read in our source data, we only see an empty dataframe with some high-level information available. To explicitly bring the data into memory, we must run a `compute()` command on the data's frame." - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": { - "ExecuteTime": { - "end_time": "2023-08-30T14:58:36.484627Z", - "start_time": "2023-08-30T14:58:36.213215Z" - } - }, - "outputs": [], - "source": [ - "ens.source.compute() # Compute lets dask know we're ready to bring the data into memory" - ] - }, - { - "attachments": {}, - "cell_type": "markdown", - "metadata": {}, - "source": [ - "With this compute, we see above that we have returned a populated dataframe (a Pandas dataframe in fact!). From this, many workflows in Dask and by extension TAPE, will look like a series of lazily evaluated commands that are chained together and then executed with a .compute() call at the end of the workflow.\n", + "This borrows from the Rubin Observatories object-source convention, where object denotes a given astronomical object and source is the collection of measurements of that object. Essentially, the Object frame stores one-off information about objects, and the source frame stores the available time-domain data.\n", "\n", - "Alternatively we can use `ens.persist()` to execute the chained commands without loading the result into memory. This can speed up future `compute()` calls.\n", + "The dataframes tracked by the `Ensemble` are `EnsembleFrames` (of which the Source and Object tables are special cases).\n", "\n", - "Note that `Ensemble.source` and `Ensemble.object` are instances of the `tape.SourceFrame` and `tape.ObjectFrame` classes respectively. These are subclasses of Dask dataframes that provide some additional utility for tracking by the ensemble while supporting most of the Dask dataframe API. " - ] - }, - { - "attachments": {}, - "cell_type": "markdown", - "metadata": {}, - "source": [ - "## Inspection and Filtering\n", - "\n", - "The `Ensemble` contains an assortment of functions for inspecting and filtering your data." - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "### Inspection\n", - "\n", - "These functions provide views into the contents of your `Ensemble` dataframe, especially important when dealing with large data volumes that cannot be brought into memory all at once. The first is `Ensemble.info` which provides information on the columns, data types, and memory usage of the dataframe." - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": { - "ExecuteTime": { - "end_time": "2023-08-30T14:58:36.696142Z", - "start_time": "2023-08-30T14:58:36.361967Z" - } - }, - "outputs": [], - "source": [ - "# Inspection\n", - "\n", - "ens.info(verbose=True, memory_usage=True) # Grabs high level information about the dataframes" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "`Ensemble.info` shows that we have 2000 rows and the the memory they use, and it also shows the columns we've brought in with their respective data types. If you'd like to actually bring a few rows into memory to inspect, `EnsembleFrame.head` and `EnsembleFrame.tail` provide access to the first n and last n rows respectively." - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": { - "ExecuteTime": { - "end_time": "2023-08-30T14:58:36.696879Z", - "start_time": "2023-08-30T14:58:36.510953Z" - } - }, - "outputs": [], - "source": [ - "ens.object.head(5) # Grabs the first 5 rows of the object table" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": { - "ExecuteTime": { - "end_time": "2023-08-30T14:58:36.697259Z", - "start_time": "2023-08-30T14:58:36.561399Z" - } - }, - "outputs": [], - "source": [ - "ens.source.tail(5) # Grabs the last 5 rows of the source table" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "Additionally, when you are working with a small enough dataset, `Ensemble.compute` can be used to bring the whole dataframe into memory (as shown previously). " - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": { - "ExecuteTime": { - "end_time": "2023-08-30T14:58:36.697769Z", - "start_time": "2023-08-30T14:58:36.592238Z" - } - }, - "outputs": [], - "source": [ - "ens.source.compute()" - ] - }, - { - "attachments": {}, - "cell_type": "markdown", - "metadata": {}, - "source": [ - "Several methods exist to access individual lightcurves within the `Ensemble`. First of which is the `to_timeseries` function. This allows you to supply a given object ID, and returns a `TimeSeries` object (see )." + "
\n", + " \"Visualization\n", + "
\n", + "You can access the individual Source and Object dataframes with `Ensemble.source` and `Ensemble.object` respectively." ] }, { @@ -308,15 +53,7 @@ "metadata": {}, "outputs": [], "source": [ - "ens.to_timeseries(8003).data" - ] - }, - { - "attachments": {}, - "cell_type": "markdown", - "metadata": {}, - "source": [ - "Alternatively, if you aren't interested in a particular lightcurve, you can draw a random one from the `Ensemble` using `Ensemble.select_random_timeseries`." + "ens.source" ] }, { @@ -325,63 +62,27 @@ "metadata": {}, "outputs": [], "source": [ - "ens.select_random_timeseries(seed=1).data" + "ens.object" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ - "### Filtering\n", + "## Dask and \"Lazy Evaluation\"\n", "\n", - "The `Ensemble` provides a general filtering function [`query`](https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.query.html) that mirrors a Pandas or Dask `query` command. Specifically, the function takes a string that provides an expression indicating which rows to **keep**. As with other `Ensemble` functions, an optional `table` parameter allows you to filter on either the object or the source table.\n", + "`TAPE` is built on top of [Dask](https://github.com/dask/dask), a framework for flexible parallelization and data analytics.\n", "\n", - "For example, the following code filters the sources to only include rows with flux values above the median. It uses `ens._flux_col` to retrieve the name of the column with that information." - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": { - "ExecuteTime": { - "end_time": "2023-08-30T14:58:36.698305Z", - "start_time": "2023-08-30T14:58:36.615492Z" - } - }, - "outputs": [], - "source": [ - "highest_flux = ens.source[ens._flux_col].quantile(0.95).compute()\n", - "ens.source.query(f\"{ens._flux_col} < {highest_flux}\").compute()" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "Alternatively, we could use a Dask dataseries of Booleans to indicate which rows to *keep*. We can often compute these series as the result of some operation on the underlying tables:" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": { - "ExecuteTime": { - "end_time": "2023-08-30T14:58:36.754980Z", - "start_time": "2023-08-30T14:58:36.669055Z" - } - }, - "outputs": [], - "source": [ - "# Find all of the source points with the lowest 90% of errors.\n", - "keep_rows = ens.source[\"error\"] < ens.source[\"error\"].quantile(0.9)\n", - "keep_rows.compute()" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "We also provide filtering at the `Ensemble` level, so you can pass the above series to the `Ensemble.filter_from_series` function:" + "`TAPE`'s `EnsembleFrame` objects inherit from [Dask dataframes](https://docs.dask.org/en/stable/dataframe.html#dask-dataframe) and support most `Dask` operations.\n", + "\n", + "Note that Source and Object tables are `SourceFrames` and `ObjectFrames` respectively, which are special cases of `EnsembleFrames`.\n", + "
\n", + " \"EnsembleFrame\n", + "
\n", + "\n", + "An important feature of `Dask` is that it evaluates code \"lazily\". This means that many operations are not executed when the line of code is run, but instead are added to a scheduler to be executed when the result is actually needed.\n", + "\n", + "As an example:" ] }, { @@ -389,21 +90,20 @@ "execution_count": null, "metadata": { "ExecuteTime": { - "end_time": "2023-08-30T14:58:36.792088Z", - "start_time": "2023-08-30T14:58:36.690772Z" + "end_time": "2023-08-30T14:58:36.219081Z", + "start_time": "2023-08-30T14:58:36.205629Z" } }, "outputs": [], "source": [ - "ens.filter_from_series(keep_rows, table=\"source\")\n", - "ens.source.compute()" + "ens.source # We have not actually loaded any data into memory" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ - "Additionally, several more specific functions are available for common operations." + "When accessing the Source dataframe above, we only saw an empty dataframe with some high-level information about its schema. To explicitly bring the data into memory, we must run a `compute()` command on the dataframe." ] }, { @@ -411,20 +111,13 @@ "execution_count": null, "metadata": { "ExecuteTime": { - "end_time": "2023-08-30T14:58:37.026887Z", - "start_time": "2023-08-30T14:58:36.715537Z" + "end_time": "2023-08-30T14:58:36.484627Z", + "start_time": "2023-08-30T14:58:36.213215Z" } }, "outputs": [], "source": [ - "# Cleaning nans\n", - "ens.source.dropna() # clean nans from source table\n", - "ens.object.dropna() # clean nans from object table\n", - "\n", - "# Filtering on number of observations\n", - "ens.prune(threshold=10) # threshold is the minimum number of observations needed to retain the object\n", - "\n", - "ens.info(verbose=True)" + "ens.source.compute() # Compute lets Dask know we're ready to bring the data into memory" ] }, { @@ -432,66 +125,9 @@ "cell_type": "markdown", "metadata": {}, "source": [ - "In the above operations, we remove any rows that have at least 1 NaN value present. And then filter such that only lightcurves which have at least 50 measurements are retained." - ] - }, - { - "attachments": {}, - "cell_type": "markdown", - "metadata": {}, - "source": [ - "### Sampling\n", + "With this `compute()`, we returned a populated dataframe. \n", "\n", - "In addition to filtering by specific constraints, it's possible to select a subset of your data to work with. `Ensemble.sample` will randomly select a fraction of objects from the full object list. This will return a new\n", - "ensemble object to work with." - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "subset_ens = ens.sample(frac=0.5) # select ~half of the objects\n", - "\n", - "print(\"Number of pre-sampled objects: \", len(ens.object))\n", - "print(\"Number of post-sampled objects: \", len(subset_ens.object))" - ] - }, - { - "attachments": {}, - "cell_type": "markdown", - "metadata": {}, - "source": [ - "### Column Selection\n", - "\n", - "The `Ensemble` provides a `select` function to filter down to a subset of columns. For example, let's remove a column from the source table." - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "# We currently have 5 columns\n", - "ens.source" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": { - "ExecuteTime": { - "end_time": "2023-08-30T14:58:37.096860Z", - "start_time": "2023-08-30T14:58:36.937579Z" - } - }, - "outputs": [], - "source": [ - "# We can use select to just grab a subset of columns\n", - "ens.select([\"time\", \"flux\", \"error\", \"band\"], table=\"source\")\n", - "ens.source" + "Many workflows in `TAPE` use this `Dask` paradigm and will look like a series of lazily evaluated commands that are chained together and then executed with a `compute()` call at the end of the workflow." ] }, { @@ -506,9 +142,9 @@ "cell_type": "markdown", "metadata": {}, "source": [ - "The `Ensemble` is a manager of `EnsembleFrame` objects (of which `Ensemble.source` and `Ensemble.object` are special cases). When performing operations on one of the tables, the results are not automatically sent to the `Ensemble`.\n", + "As discussed above, an `Ensemble` is a manager of `EnsembleFrame` objects (and `Ensemble.source` and `Ensemble.object` are special cases). When performing most operations on one of these tables, the results are not automatically updated to the `Ensemble`.\n", "\n", - "So while in the above examples we demonstrate several methods where we generated filtered views of the source table, note that the underlying data remained unchanged, with no changes to the rows or columns of `Ensemble.source`" + "Here we filter `Ensemble.source` by its flux column (see more examples of filtering using these `Dask`/`Pandas` style operations in [Common Data Operations with TAPE](https://tape.readthedocs.io/en/latest/tutorials/common_data_operations.html#Filtering)), but note there were no changes to the rows of `Ensemble.source`." ] }, { @@ -517,9 +153,9 @@ "metadata": {}, "outputs": [], "source": [ - "queried_src = ens.source.query(f\"{ens._flux_col} < {highest_flux}\")\n", + "filtered_src = ens.source.query(f\"{ens._flux_col} > 15\")\n", "\n", - "print(len(queried_src))\n", + "print(len(filtered_src))\n", "print(len(ens.source))" ] }, @@ -527,13 +163,13 @@ "cell_type": "markdown", "metadata": {}, "source": [ - "When modifying the views of a dataframe tracked by the `Ensemble`, we can update the `Source` or `Object` frame to use the updated view by calling\n", + "Most dataframe operations will return a result frame that is not yet tracked by the `Ensemble`. When modifying the views of a dataframe tracked by the `Ensemble`, we can update the Source or Object frames to use the updated result frame by calling\n", "\n", - "`Ensemble.update_frame(view_frame)`\n", + "`Ensemble.update_frame(filtered_src)`\n", "\n", "Or alternately:\n", "\n", - "`view_frame.update_ensemble()`" + "`filtered_src.update_ensemble()`" ] }, { @@ -542,8 +178,8 @@ "metadata": {}, "outputs": [], "source": [ - "# Now apply the views filter to the source frame.\n", - "queried_src.update_ensemble()\n", + "# Now apply the filtered result to the Source frame.\n", + "filtered_src.update_ensemble()\n", "\n", "ens.source.compute()" ] @@ -552,57 +188,28 @@ "cell_type": "markdown", "metadata": {}, "source": [ - "Note that the above is still a series of lazy operations that will not be fully evaluated until an operation such as `compute`. So a call to `update_ensemble` will not yet alter or move any underlying data." - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "## Assignments and Column Manipulation\n", + "The Source frame now has the number of rows we saw when inspecting the filtered result above.\n", "\n", - "The ensemble object supports assignment through the Dask `assign` function. We can pass in either a callable or a series to assign to the new column. New column names are produced automatically from the argument name.\n", - "\n", - "For example, if we want to compute the lower bound of an error range as the estimated flux minus twice the estimated error, we would use:" + "Note that the above is still a series of lazy operations that will not be fully evaluated until an operation such as `compute()`. So a call to `update_ensemble()` will not yet alter or move any underlying data." ] }, { - "cell_type": "code", - "execution_count": null, - "metadata": { - "ExecuteTime": { - "end_time": "2023-08-30T14:58:37.097571Z", - "start_time": "2023-08-30T14:58:36.958927Z" - } - }, - "outputs": [], - "source": [ - "lower_bnd = ens.source.assign(lower_bnd=lambda x: x[\"flux\"] - 2.0 * x[\"error\"])\n", - "lower_bnd" - ] - }, - { - "attachments": {}, "cell_type": "markdown", "metadata": {}, "source": [ - "## Batch Analysis\n", + "# Storing and Accessing Result Frames\n", + "The `Ensemble` provides a powerful batching interface, `Ensemble.batch()`, to perform analysis functions in parallel across your lightcurves.\n", "\n", - "The `Ensemble` provides a powerful batching interface, `Ensemble.batch`, with in-built parallelization (provided the input data is in multiple partitions). In addition, TAPE has a suite of analysis functions on-hand for your use. Below, we show the application of `tape.analysis.calc_stetson_J` on our dataset." + "For the below example, we use the included suite of analysis functions to apply `tape.analysis.calc_stetson_J` on our dataset. (For more info on `Ensemble.batch()`, including providing your own custom functions, see the [Ensemble Batch Showcase](https://tape.readthedocs.io/en/latest/tutorials/batch_showcase.html#) )" ] }, { "cell_type": "code", "execution_count": null, - "metadata": { - "ExecuteTime": { - "end_time": "2023-08-30T14:58:37.492980Z", - "start_time": "2023-08-30T14:58:36.981314Z" - } - }, + "metadata": {}, "outputs": [], "source": [ - "# using tape analysis functions\n", + "# using TAPE analysis functions\n", "from tape.analysis import calc_stetson_J\n", "\n", "res = ens.batch(calc_stetson_J)\n", @@ -613,18 +220,13 @@ "cell_type": "markdown", "metadata": {}, "source": [ - "# Storing and Accessing Result Frames" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "Note for the above `batch` operation, we also printed:\n", + "Note for the above `batch` operation, we also printed output in the form of\n", "\n", - "`Using generated label, result_1, for a batch result.`\n", + "```\n", + " \"Using generated label, {label}, for a batch result.\"\n", + "```\n", "\n", - "In addition to the source and object frames, the `Ensemble` may track other frames as well, accessed by either generated or user-provided labels.\n", + "In addition to the Source and Object frames, the `Ensemble` may track other frames as well, accessed by either generated or user-provided labels.\n", "\n", "We can access a saved frame with `Ensemble.select_frame(label)`" ] @@ -705,12 +307,12 @@ "metadata": {}, "outputs": [], "source": [ - "ens.drop_frame(\"result_1\")\n", + "ens.drop_frame(\"new_res\")\n", "\n", "try:\n", - " ens.select_frame(\"result_1\") # This should result in a KeyError since the frame has been dropped.\n", + " ens.select_frame(\"new_res\") # This should result in a KeyError since the frame has been dropped.\n", "except Exception as e:\n", - " print(\"As expected, the frame 'result_1 was dropped.\\n\" + str(e))" + " print(\"As expected, the frame 'new_res' was dropped.\\n\", str(e))" ] }, { @@ -719,12 +321,12 @@ "source": [ "# Keeping the Object and Source Tables in Sync\n", "\n", - "The Tape `Ensemble` attempts to lazily \"sync\" the Object and Source tables such that:\n", + "The `TAPE` `Ensemble` attempts to lazily \"sync\" the Object and Source tables such that:\n", "\n", "* If a series of operations removes all lightcurves for a particular object from the Source table, we will lazily remove that object from the Object table.\n", "* If a series of operations removes an object from the Object table, we will lazily remove all light curves for that object from the Source table.\n", "\n", - "As an example let's filter the Object table only for objects observed from deep drilling fields. This operation marks the result table as `dirty` indicating to the `Ensemble` that if used as part of a result computation, it should check if the object and source tables are synced. \n", + "As an example, let's filter the Object table only for objects of type 'ab'. This operation marks the result table as `dirty` indicating to the `Ensemble` that if used as part of a result computation, it should check if the Object and Source tables are synced. \n", "\n", "Note that because we have not called `update_ensemble()` the `Ensemble` is still using the original Object table which is **not** marked `dirty`.\n" ] @@ -735,20 +337,20 @@ "metadata": {}, "outputs": [], "source": [ - "ddf_only = ens.object.query(\"ddf_bool == True\")\n", + "type_ab_only = ens.object.query(\"type == 'ab'\")\n", "\n", - "print(\"Object table is dirty: \" + str(ens.object.is_dirty()))\n", - "print(\"ddf_only is dirty: \" + str(ddf_only.is_dirty()))\n", - "ddf_only.compute()" + "print(\"Object table is dirty: \", str(ens.object.is_dirty()))\n", + "print(\"ddf_only is dirty: \", str(type_ab_only.is_dirty()))\n", + "type_ab_only.compute()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ - "Now let's update the `Ensemble`'s Object table. We can see that the Object table is now considered \"dirty\" so a sync between the Source and Object tables will be triggered by computing a `batch` operation. \n", + "Now let's update the `Ensemble`'s Object table. We can see that the Object table is now considered \"dirty\" so a sync between the Source and Object tables will be triggered by computing an `Ensemble.batch()` operation. \n", "\n", - "As part of the sync the Source table has been modified to drop all sources for objects not observed via Deep Drilling Fields. This is reflected both in the `batch` result output and in the reduced number of rows in the Source table." + "As part of the sync the Source table has been modified to drop all sources for objects not with types other than 'ab'. This is reflected both in the `Ensemble.batch()` result output and in the reduced number of rows in the Source table." ] }, { @@ -757,7 +359,7 @@ "metadata": {}, "outputs": [], "source": [ - "ddf_only.update_ensemble()\n", + "type_ab_only.update_ensemble()\n", "print(\"Updated object table is now dirty: \" + str(ens.object.is_dirty()))\n", "\n", "print(\"Length of the Source table before the batch operation: \" + str(len(ens.source)))\n", @@ -774,152 +376,8 @@ "To summarize:\n", "\n", "* An operation that alters a frame marks that frame as \"dirty\"\n", - "* Such an operation on `Ensemble.source` or `Ensemble.object` won't cause a sync unless the output frame is stored back to either `Ensemble.source` or `Ensemble.object` respectively. This is usually done by a call to `EnsembleFrame.update_ensemble()`\n", - "* Syncs are done lazily such that even when the Object and/or Source frames are \"dirty\", a sync between tables won't be triggered until a relevant computation yields an observable output, such as `batch(..., compute=True)`" - ] - }, - { - "cell_type": "markdown", - "metadata": { - "collapsed": false - }, - "source": [ - "## Using light-curve package features\n", - "\n", - "`Ensemble.batch` also supports the use of [light-curve](https://pypi.org/project/light-curve/) package feature extractor:" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": { - "ExecuteTime": { - "end_time": "2023-08-30T14:58:37.514514Z", - "start_time": "2023-08-30T14:58:37.494001Z" - } - }, - "outputs": [], - "source": [ - "import light_curve as licu\n", - "\n", - "extractor = licu.Extractor(licu.Amplitude(), licu.AndersonDarlingNormal(), licu.StetsonK())\n", - "# band_to_calc=None will ignore the band column and use all sources for each object\n", - "res = ens.batch(extractor, band_to_calc=\"g\")\n", - "res.compute()" - ] - }, - { - "attachments": {}, - "cell_type": "markdown", - "metadata": {}, - "source": [ - "## Using a Custom Analysis Function\n", - "The analysis functions contained in TAPE are meant to provide a collection of performant, on-hand routines for common timeseries use cases. However, TAPE is also equipped to handle externally defined functions. Let's walk through a short example of defining a simple custom function and applying it through `Ensemble.batch`.\n", - "\n", - "Here we define a simple function, that returns an average flux for each photometric band. It requires an array of fluxes, an array of band labels per measurement, and has a keyword argument for determining which averaging strategy to use (mean or median)." - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": { - "ExecuteTime": { - "end_time": "2023-08-30T14:58:37.519972Z", - "start_time": "2023-08-30T14:58:37.515404Z" - } - }, - "outputs": [], - "source": [ - "import numpy as np\n", - "\n", - "\n", - "# Defining a simple function\n", - "def my_flux_average(flux_array, band_array, method=\"mean\"):\n", - " \"\"\"Read in an array of fluxes, and return the average of the fluxes by band\"\"\"\n", - " res = {}\n", - " for band in np.unique(band_array):\n", - " mask = [band_array == band] # Create a band by band mask\n", - " band_fluxes = flux_array[tuple(mask)] # Mask the flux array\n", - " if method == \"mean\":\n", - " res[band] = np.mean(band_fluxes)\n", - " elif method == \"median\":\n", - " res[band] = np.median(band_fluxes)\n", - " return res" - ] - }, - { - "attachments": {}, - "cell_type": "markdown", - "metadata": {}, - "source": [ - "With the function defined, we next supply it to `Ensemble.batch`. The column labels of the `Ensemble` columns we want to use as arguments must be provided, as well as any keyword arguments. In this case, we pass along `\"flux\"` and `\"band\"`, so that the `Ensemble` will map those columns to `flux_array` and `band_array` respectively. We also pass `method='mean'`, which will pass that kwarg along to `my_flux_average`." - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": { - "ExecuteTime": { - "end_time": "2023-08-30T14:58:37.583850Z", - "start_time": "2023-08-30T14:58:37.519056Z" - } - }, - "outputs": [], - "source": [ - "# Applying the function to the ensemble\n", - "res = ens.batch(my_flux_average, \"flux\", \"band\", meta=None, method=\"median\")\n", - "res.compute()" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "We see that we now have a `Pandas.series` of `my_average_flux` result by object_id (lightcurve). In many cases, this may not be the ideal output for your function. This output is controlled by the `Dask` `meta` parameter. For more information on this parameter, you can read the `Dask` [documentation](https://blog.dask.org/2022/08/09/understanding-meta-keyword-argument). You may pass the `meta` parameter through `Ensemble.batch`, as shown above." - ] - }, - { - "attachments": {}, - "cell_type": "markdown", - "metadata": {}, - "source": [ - "## Saving the Ensemble to Disk\n", - "\n", - "In some situations, you may find yourself running a given workflow many times. Due to the nature of lazy-computation, this will involve repeated execution of data I/O, pre-processing steps, initial analysis, etc. In these situations, it may be effective to instead save the ensemble state to disk after completion of these initial processing steps. To accomplish this, we can use the `Ensemble.save_ensemble` function." - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "ens.save_ensemble(\n", - " \".\",\n", - " \"ensemble\",\n", - " additional_frames=[\"result_3\"],\n", - ") # Saves object, source, and result_3 to disk" - ] - }, - { - "attachments": {}, - "cell_type": "markdown", - "metadata": {}, - "source": [ - "The above command creates an \"ensemble\" directory in the current working directory. This directory contains a subdirectory of parquet files for each `EnsembleFrame` object that was included in the `additional_frames` kwarg. Note that if `additional_frames` was set to True or False this would save all or none of the additional `EnsembleFrame` objects respectively, and that the object (unless it has no columns) and source frames are always saved.\n", - "\n", - "From here, we can just load the ensemble from disk." - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "new_ens = Ensemble()\n", - "new_ens.from_ensemble(\"./ensemble\", additional_frames=True)\n", - "new_ens.select_frame(\"result_3\").head(5)" + "* Such an operation on `Ensemble.source` or `Ensemble.object` won't cause a sync between the Source and Object if the output frame has not been stored back to either `Ensemble.source` or `Ensemble.object` respectively. This is usually done by a call to `EnsembleFrame.update_ensemble()`\n", + "* Syncs are done lazily such that even when the Object and/or Source frames are \"dirty\", a sync between tables won't be triggered until a relevant computation yields an observable output, such as `batch(..., compute=True)` or `Ensemble.source.merge(result).compute()`" ] } ], @@ -939,7 +397,7 @@ "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", - "version": "3.10.11" + "version": "3.10.13" }, "vscode": { "interpreter": {