Skip to content
This repository has been archived by the owner on Jan 14, 2025. It is now read-only.

Save to parquet #343

Merged
merged 15 commits into from
Jan 11, 2024
82 changes: 57 additions & 25 deletions docs/tutorials/working_with_the_ensemble.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -26,17 +26,17 @@
"import numpy as np\n",
"import pandas as pd\n",
"\n",
"np.random.seed(1) \n",
"np.random.seed(1)\n",
"\n",
"# Generate 10 astronomical objects\n",
"n_obj = 10\n",
"ids = 8000 + np.arange(n_obj)\n",
"names = ids.astype(str)\n",
"object_table = pd.DataFrame(\n",
" {\n",
" \"id\": ids, \n",
" \"id\": ids,\n",
" \"name\": names,\n",
" \"ddf_bool\": np.random.randint(0, 2, n_obj), # 0 if from deep drilling field, 1 otherwise\n",
" \"ddf_bool\": np.random.randint(0, 2, n_obj), # 0 if from deep drilling field, 1 otherwise\n",
" \"libid_cadence\": np.random.randint(1, 130, n_obj),\n",
" }\n",
")\n",
Expand All @@ -49,7 +49,7 @@
" {\n",
" \"id\": 8000 + (np.arange(num_points) % n_obj),\n",
" \"time\": np.arange(num_points),\n",
" \"flux\": np.random.random_sample(size=num_points)*10,\n",
" \"flux\": np.random.random_sample(size=num_points) * 10,\n",
" \"band\": np.repeat(all_bands, num_points / len(all_bands)),\n",
" \"error\": np.random.random_sample(size=num_points),\n",
" \"count\": np.arange(num_points),\n",
Expand Down Expand Up @@ -89,7 +89,8 @@
" flux_col=\"flux\",\n",
" err_col=\"error\",\n",
" band_col=\"band\",\n",
" npartitions=1)"
" npartitions=1,\n",
")"
]
},
{
Expand Down Expand Up @@ -124,18 +125,12 @@
"from tape.utils import ColumnMapper\n",
"\n",
"# columns assigned manually\n",
"col_map = ColumnMapper().assign(id_col=\"id\",\n",
" time_col=\"time\",\n",
" flux_col=\"flux\",\n",
" err_col=\"error\",\n",
" band_col=\"band\")\n",
"col_map = ColumnMapper().assign(\n",
" id_col=\"id\", time_col=\"time\", flux_col=\"flux\", err_col=\"error\", band_col=\"band\"\n",
")\n",
"\n",
"# Pass the ColumnMapper along to from_pandas\n",
"ens.from_pandas(\n",
" source_frame=source_table,\n",
" object_frame=object_table,\n",
" column_mapper=col_map,\n",
" npartitions=1)"
"ens.from_pandas(source_frame=source_table, object_frame=object_table, column_mapper=col_map, npartitions=1)"
]
},
{
Expand Down Expand Up @@ -616,8 +611,8 @@
"metadata": {},
"outputs": [],
"source": [
"ens.add_frame(ens.select_frame(\"stetson_j\"), \"stetson_j_result_1\") # Add result under new label\n",
"ens.drop_frame(\"stetson_j\") # Drop original label\n",
"ens.add_frame(ens.select_frame(\"stetson_j\"), \"stetson_j_result_1\") # Add result under new label\n",
"ens.drop_frame(\"stetson_j\") # Drop original label\n",
"\n",
"ens.select_frame(\"stetson_j_result_1\").compute()"
]
Expand Down Expand Up @@ -655,7 +650,7 @@
"ens.drop_frame(\"result_1\")\n",
"\n",
"try:\n",
" ens.select_frame(\"result_1\") # This should result in a KeyError since the frame has been dropped.\n",
" ens.select_frame(\"result_1\") # This should result in a KeyError since the frame has been dropped.\n",
"except Exception as e:\n",
" print(\"As expected, the frame 'result_1 was dropped.\\n\" + str(e))"
]
Expand Down Expand Up @@ -825,6 +820,50 @@
"We see that we now have a `Pandas.series` of `my_average_flux` result by object_id (lightcurve). In many cases, this may not be the ideal output for your function. This output is controlled by the `Dask` `meta` parameter. For more information on this parameter, you can read the `Dask` [documentation](https://blog.dask.org/2022/08/09/understanding-meta-keyword-argument). You may pass the `meta` parameter through `Ensemble.batch`, as shown above."
]
},
{
"attachments": {},
"cell_type": "markdown",
"metadata": {},
"source": [
"## Saving the Ensemble to Disk\n",
"\n",
"In some situations, you may find yourself running a given workflow many times. Due to the nature of lazy-computation, this will involve repeated execution of data I/O, pre-processing steps, initial analysis, etc. In these situations, it may be effective to instead save the ensemble state to disk after completion of these initial processing steps. To accomplish this, we can use the `Ensemble.save_ensemble` function."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"ens.save_ensemble(\n",
" \".\",\n",
" \"ensemble\",\n",
" additional_frames=[\"result_3\"],\n",
") # Saves object, source, and result_3 to disk"
]
},
{
"attachments": {},
"cell_type": "markdown",
"metadata": {},
"source": [
"The above command creates an \"ensemble\" directory in the current working directory. This directory contains a subdirectory of parquet files for each `EnsembleFrame` object that was included in the `additional_frames` kwarg. Note that if `additional_frames` was set to True or False this would save all or none of the additional `EnsembleFrame` objects respectively, and that the object (unless it has no columns) and source frames are always saved.\n",
"\n",
"From here, we can just load the ensemble from disk."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"new_ens = Ensemble(client=ens.client) # use the same client\n",
"new_ens.from_ensemble(\"./ensemble\", additional_frames=True)\n",
"new_ens.select_frame(\"result_3\").head(5)"
]
},
{
"cell_type": "code",
"execution_count": null,
Expand All @@ -838,13 +877,6 @@
"source": [
"ens.client.close() # Tear down the ensemble client"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": []
}
],
"metadata": {
Expand Down
Loading