Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Save to parquet #343

Merged
merged 15 commits into from
Jan 11, 2024
82 changes: 57 additions & 25 deletions docs/tutorials/working_with_the_ensemble.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -26,17 +26,17 @@
"import numpy as np\n",
"import pandas as pd\n",
"\n",
"np.random.seed(1) \n",
"np.random.seed(1)\n",
"\n",
"# Generate 10 astronomical objects\n",
"n_obj = 10\n",
"ids = 8000 + np.arange(n_obj)\n",
"names = ids.astype(str)\n",
"object_table = pd.DataFrame(\n",
" {\n",
" \"id\": ids, \n",
" \"id\": ids,\n",
" \"name\": names,\n",
" \"ddf_bool\": np.random.randint(0, 2, n_obj), # 0 if from deep drilling field, 1 otherwise\n",
" \"ddf_bool\": np.random.randint(0, 2, n_obj), # 0 if from deep drilling field, 1 otherwise\n",
" \"libid_cadence\": np.random.randint(1, 130, n_obj),\n",
" }\n",
")\n",
Expand All @@ -49,7 +49,7 @@
" {\n",
" \"id\": 8000 + (np.arange(num_points) % n_obj),\n",
" \"time\": np.arange(num_points),\n",
" \"flux\": np.random.random_sample(size=num_points)*10,\n",
" \"flux\": np.random.random_sample(size=num_points) * 10,\n",
" \"band\": np.repeat(all_bands, num_points / len(all_bands)),\n",
" \"error\": np.random.random_sample(size=num_points),\n",
" \"count\": np.arange(num_points),\n",
Expand Down Expand Up @@ -89,7 +89,8 @@
" flux_col=\"flux\",\n",
" err_col=\"error\",\n",
" band_col=\"band\",\n",
" npartitions=1)"
" npartitions=1,\n",
")"
]
},
{
Expand Down Expand Up @@ -124,18 +125,12 @@
"from tape.utils import ColumnMapper\n",
"\n",
"# columns assigned manually\n",
"col_map = ColumnMapper().assign(id_col=\"id\",\n",
" time_col=\"time\",\n",
" flux_col=\"flux\",\n",
" err_col=\"error\",\n",
" band_col=\"band\")\n",
"col_map = ColumnMapper().assign(\n",
" id_col=\"id\", time_col=\"time\", flux_col=\"flux\", err_col=\"error\", band_col=\"band\"\n",
")\n",
"\n",
"# Pass the ColumnMapper along to from_pandas\n",
"ens.from_pandas(\n",
" source_frame=source_table,\n",
" object_frame=object_table,\n",
" column_mapper=col_map,\n",
" npartitions=1)"
"ens.from_pandas(source_frame=source_table, object_frame=object_table, column_mapper=col_map, npartitions=1)"
]
},
{
Expand Down Expand Up @@ -616,8 +611,8 @@
"metadata": {},
"outputs": [],
"source": [
"ens.add_frame(ens.select_frame(\"stetson_j\"), \"stetson_j_result_1\") # Add result under new label\n",
"ens.drop_frame(\"stetson_j\") # Drop original label\n",
"ens.add_frame(ens.select_frame(\"stetson_j\"), \"stetson_j_result_1\") # Add result under new label\n",
"ens.drop_frame(\"stetson_j\") # Drop original label\n",
"\n",
"ens.select_frame(\"stetson_j_result_1\").compute()"
]
Expand Down Expand Up @@ -655,7 +650,7 @@
"ens.drop_frame(\"result_1\")\n",
"\n",
"try:\n",
" ens.select_frame(\"result_1\") # This should result in a KeyError since the frame has been dropped.\n",
" ens.select_frame(\"result_1\") # This should result in a KeyError since the frame has been dropped.\n",
"except Exception as e:\n",
" print(\"As expected, the frame 'result_1 was dropped.\\n\" + str(e))"
]
Expand Down Expand Up @@ -825,6 +820,50 @@
"We see that we now have a `Pandas.series` of `my_average_flux` result by object_id (lightcurve). In many cases, this may not be the ideal output for your function. This output is controlled by the `Dask` `meta` parameter. For more information on this parameter, you can read the `Dask` [documentation](https://blog.dask.org/2022/08/09/understanding-meta-keyword-argument). You may pass the `meta` parameter through `Ensemble.batch`, as shown above."
]
},
{
"attachments": {},
"cell_type": "markdown",
"metadata": {},
"source": [
"## Saving the Ensemble to Disk\n",
"\n",
"In some situations, you may find yourself running a given workflow many times. Due to the nature of lazy-computation, this will involve repeated execution of data I/O, pre-processing steps, initial analysis, etc. In these situations, it may be effective to instead save the ensemble state to disk after completion of these initial processing steps. To accomplish this, we can use the `Ensemble.save_ensemble` function."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"ens.save_ensemble(\n",
" \".\",\n",
" \"ensemble\",\n",
" additional_frames=[\"result_3\"],\n",
") # Saves object, source, and result_3 to disk"
]
},
{
"attachments": {},
"cell_type": "markdown",
"metadata": {},
"source": [
"The above command creates an \"ensemble\" directory in the current working directory. This directory contains a subdirectory of parquet files for each `EnsembleFrame` object that was included in the `additional_frames` kwarg. Note that if `additional_frames` was set to True or False this would save all or none of the additional `EnsembleFrame` objects respectively, and that the object (unless it has no columns) and source frames are always saved.\n",
"\n",
"From here, we can just load the ensemble from disk."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"new_ens = Ensemble(client=ens.client) # use the same client\n",
"new_ens.from_ensemble(\"./ensemble\", additional_frames=True)\n",
"new_ens.select_frame(\"result_3\").head(5)"
]
},
{
"cell_type": "code",
"execution_count": null,
Expand All @@ -838,13 +877,6 @@
"source": [
"ens.client.close() # Tear down the ensemble client"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": []
}
],
"metadata": {
Expand Down
166 changes: 164 additions & 2 deletions src/tape/ensemble.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

from dask.distributed import Client
from collections import Counter
from collections.abc import Iterable

from .analysis.base import AnalysisFunction
from .analysis.feature_extractor import BaseLightCurveFeature, FeatureExtractor
Expand Down Expand Up @@ -801,7 +802,9 @@ def calc_nobs(self, by_band=False, label="nobs", temporary=True):
band_counts["total"] = band_counts[list(band_counts.columns)].sum(axis=1)

bands = band_counts.columns.values
self.object = self.object.assign(**{label + "_" + str(band): band_counts[band] for band in bands})
self.object.assign(
**{label + "_" + str(band): band_counts[band] for band in bands}
).update_ensemble()

if temporary:
self._object_temp.extend(label + "_" + str(band) for band in bands)
Expand All @@ -824,7 +827,7 @@ def calc_nobs(self, by_band=False, label="nobs", temporary=True):
.repartition(npartitions=self.object.npartitions)
)

self.object = self.object.assign(**{label + "_total": counts[self._band_col]})
self.object.assign(**{label + "_total": counts[self._band_col]}).update_ensemble()

if temporary:
self._object_temp.extend([label + "_total"])
Expand Down Expand Up @@ -1237,6 +1240,165 @@ def _standardize_batch(self, batch, on, by_band):

return batch

def save_ensemble(self, path=".", dirname="ensemble", additional_frames=True, **kwargs):
"""Save the current ensemble frames to disk.

Parameters
----------
path: 'str' or path-like, optional
A path to the desired location of the top-level save directory, by
default this is the current working directory.
dirname: 'str', optional
The name of the saved ensemble directory, "ensemble" by default.
additional_frames: bool, or list, optional
Controls whether EnsembleFrames beyond the Object and Source Frames
are saved to disk. If True or False, this specifies whether all or
none of the additional frames are saved. Alternatively, a list of
EnsembleFrame names may be provided to specify which frames should
be saved. Object and Source will always be added and do not need to
be specified in the list. By default, all frames will be saved.
**kwargs:
Additional kwargs passed along to EnsembleFrame.to_parquet()

Returns
----------
None

Note
----
If the object frame has no columns, which is often the case when an
Ensemble is constructed using only source files/dictionaries, then an
object subdirectory will not be created. `Ensemble.from_ensemble` will
know how to work with the directory whether or not the object
subdirectory is present.

Be careful about repeated saves to the same directory name. This will
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given how liberally jupyter users may hit "run all cells", I think this could lead to some easy mistakes.

What about having an "overwrite" parameter?

If True, we will first try to delete the save directory (possibly printing a message if we did so)

If False, we can throw an exception if the save directory already exists noting they should use a different name or set "overwrite=True"

My instinct is that the default value of the parameter should be False.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, so we have access to overwrite already via the **kwargs passed along to all the to_parquet calls. What remains is that that will only overwrite any subdirectories (ensemble/object, ensemble/source, ensemble/result, etc.) that were present in both saves. So any previous subdirectories will not be deleted if a new save doesn't include them. I'm really worried about doing any form of delete, just as someone could specify a non-empty directory and I wouldn't want to delete anything else.

So for 1. Maybe the overwrite parameter should graduate out of the **kwargs for visibility? And 2. I'm not sure but maybe we could have save write some kind of subdirs metadata file that lets TAPE know which subdirectories it made, so that overwrite can clean those directories up in subsequent overwrite-enabled saves?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wilson and I talked offline a bit on this. The strategy that we agreed on is to generate a metadata file that knows which subdirectories were produced by the save command. Successive save commands will look for this file and use it to locate which subdirectories in the directory should be removed during the save operation. In particular, we'd be looking for any subdirectories created by a previous save command that will not be overwritten by the current save command. Parquet will handle overwrites for conflicting directories, but we will need ensemble.save_ensemble to catch and remove these frame subdirectories that would otherwise not be touched.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The latest commit implements this, with the tweak that I ended up opting to just clean all the previous subdirectories up first as it was logically easier.

not be a perfect overwrite, as any products produced by a previous save
may not be deleted by successive saves if they are removed from the
ensemble. For best results, delete the directory between saves or
verify that the contents are what you would expect.
"""

self._lazy_sync_tables("all")

# Determine the path
ens_path = os.path.join(path, dirname)

# Compile frame list
if additional_frames is True:
frames_to_save = list(self.frames.keys()) # save all frames
elif additional_frames is False:
frames_to_save = ["object", "source"] # save just object and source
elif isinstance(additional_frames, Iterable):
frames_to_save = [frame for frame in additional_frames if frame in list(self.frames.keys())]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: we can use sets here and give the user a more helpful error message

frames_to_save = set(additional_frames)
invalid_frames = frames_to_save.difference(set(self.frames.keys())

# Raise error and tell user the invalid frames
if len(invalid_frames) == 0:
   ...

frames_to_save.update(["object", "source"])

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Implemented in latest commit


# Raise an error if any frames were not found in the frame list
if len(frames_to_save) != len(additional_frames):
raise ValueError(
"One or more frames specified in `additional_frames` was not found in the frame list."
)

# Make sure object and source are in the frame list
if "object" not in frames_to_save:
frames_to_save.append("object")
if "source" not in frames_to_save:
frames_to_save.append("source")
else:
raise ValueError("Invalid input to `additional_frames`, must be boolean or list-like")

# Save the frame list to disk
for frame_label in frames_to_save:
# grab the dataframe from the frame label
frame = self.frames[frame_label]

# Object can have no columns, which parquet doesn't handle
# In this case, we'll avoid saving to parquet
if frame_label == "object":
if len(frame.columns) == 0:
print("The Object Frame was not saved as no columns were present.")
continue

# creates a subdirectory for the frame partition files
frame.to_parquet(os.path.join(ens_path, frame_label), **kwargs)

# Save a ColumnMapper file
col_map = self.make_column_map()
np.save(os.path.join(ens_path, "column_mapper.npy"), col_map.map)

print(f"Saved to {os.path.join(path, dirname)}")

return

def from_ensemble(self, dirpath, additional_frames=True, column_mapper=None, **kwargs):
"""Load an ensemble from an on-disk ensemble.

Parameters
----------
dirpath: 'str' or path-like, optional
A path to the top-level ensemble directory to load from.
additional_frames: bool, or list, optional
Controls whether EnsembleFrames beyond the Object and Source Frames
are loaded from disk. If True or False, this specifies whether all
or none of the additional frames are loaded. Alternatively, a list
of EnsembleFrame names may be provided to specify which frames
should be loaded. Object and Source will always be added and do not
need to be specified in the list. By default, all frames will be
loaded.
column_mapper: Tape.ColumnMapper object, or None, optional
Supplies a ColumnMapper to the Ensemble, if None (default) searches
for a column_mapper.npy file in the directory, which should be
created when the ensemble is saved.

Returns
----------
ensemble: `tape.ensemble.Ensemble`
The ensemble object.
"""

# First grab the column_mapper if not specified
if column_mapper is None:
map_dict = np.load(os.path.join(dirpath, "column_mapper.npy"), allow_pickle="TRUE").item()
column_mapper = ColumnMapper()
column_mapper.map = map_dict

# Load Object and Source
obj_path = os.path.join(dirpath, "object")
src_path = os.path.join(dirpath, "source")

# Check for whether or not object is present, it's not saved when no columns are present
if "object" in os.listdir(dirpath):
self.from_parquet(src_path, obj_path, column_mapper=column_mapper, **kwargs)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we always use sync_tables=False since we synced object and source before saving?

Also as a general question that may be out of scope for this PR, are we preserving divisions here? If so I guess it would be helpful to use sorted=True if we knew that information?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe we can always have sync_tables=False, right, good point!

For divisions, these are not being preserved at the moment, which definitely seems like an issue. We can save an ensemble without divisions being known, so the sorted parameter will need to be dependent on that information.
Similar to above, maybe there's a need for a metadata file that should be generated to indicate whether divisions are known that can be read in?

Copy link
Collaborator Author

@dougbrn dougbrn Jan 10, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just pushed another commit to better expose the sort/sorted kwargs from ensemble.from_parquet so they are usable in these cases. Some kind of metadata may be preferable here still so the user doesn't need to specify at all.

Edit: No longer accurate, refer to next comment

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another update, I now have this working with metadata. ensemble.save_ensemble now stores a boolean per frame in the metadata.json file indicating whether that frame had divisions set. Instead of using the sort/sorted kwargs, I've changed ensemble.from_ensemble to use this to determine whether to calculate divisions or not. In the case of object and source, this is just by setting the sorted flag or not. For the other frames, the sorted flag is only applicable to the set_index call that ensemble.from_parquet folds into. So this PR now has the frames generate a parquet metadata file which populates divisions information if it's available, and the reading of additional frames now uses the parquet metadata file.

I experimented with trying to get our ensemble.from_parquet to use these _metadata files, but it would be a non-trivial change in the logic that felt out of scope for this PR. It's potentially worth logging for the future as using the _metadata files would avoid having to calculate the min/max values per partition to get divisions, but on the other hand these files can get quite large for large datasets so they may have issues at scale.

else:
self.from_parquet(src_path, column_mapper=column_mapper, **kwargs)

# Load all remaining frames
if additional_frames is False:
return self # we are all done
else:
if additional_frames is True:
# Grab all subdirectory paths in the top-level folder, filter out any files
frames_to_load = [
os.path.join(dirpath, f)
for f in os.listdir(dirpath)
if not os.path.isfile(os.path.join(dirpath, f))
]
elif isinstance(additional_frames, Iterable):
frames_to_load = [os.path.join(dirpath, frame) for frame in additional_frames]
else:
raise ValueError("Invalid input to `additional_frames`, must be boolean or list-like")

# Filter out object and source from additional frames
frames_to_load = [
frame for frame in frames_to_load if os.path.split(frame)[1] not in ["object", "source"]
]
if len(frames_to_load) > 0:
for frame in frames_to_load:
label = os.path.split(frame)[1]
ddf = EnsembleFrame.from_parquet(frame, label=label, **kwargs)
self.add_frame(ddf, label)

return self

def from_pandas(
self,
source_frame,
Expand Down
38 changes: 38 additions & 0 deletions src/tape/ensemble_readers.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,44 @@
from tape.utils import ColumnMapper


def read_ensemble(dirpath, additional_frames=True, column_mapper=None, dask_client=True, **kwargs):
"""Load an ensemble from an on-disk ensemble.

Parameters
----------
dirpath: 'str' or path-like, optional
A path to the top-level ensemble directory to load from.
additional_frames: bool, or list, optional
Controls whether EnsembleFrames beyond the Object and Source Frames
are loaded from disk. If True or False, this specifies whether all
or none of the additional frames are loaded. Alternatively, a list
of EnsembleFrame names may be provided to specify which frames
should be loaded. Object and Source will always be added and do not
need to be specified in the list. By default, all frames will be
loaded.
column_mapper: Tape.ColumnMapper object, or None, optional
Supplies a ColumnMapper to the Ensemble, if None (default) searches
for a column_mapper.npy file in the directory, which should be
created when the ensemble is saved.
dask_client: `dask.distributed.client` or `bool`, optional
Accepts an existing `dask.distributed.Client`, or creates one if
`client=True`, passing any additional kwargs to a
dask.distributed.Client constructor call. If `client=False`, the
Ensemble is created without a distributed client.

Returns
----------
ensemble: `tape.ensemble.Ensemble`
An ensemble object.
"""

new_ens = Ensemble(dask_client, **kwargs)

new_ens.from_ensemble(dirpath, additional_frames=additional_frames, column_mapper=column_mapper, **kwargs)

return new_ens


def read_pandas_dataframe(
source_frame,
object_frame=None,
Expand Down
Loading