-
Notifications
You must be signed in to change notification settings - Fork 3
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Save to parquet #343
Save to parquet #343
Changes from 10 commits
764fb49
720c33c
3c5ccbb
eb0f2b7
7bd3260
54a1392
3a8cb21
c9e34a4
bd96057
3c54ff7
ab10b6d
6730d47
aad1978
4297e77
fcf67da
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -8,6 +8,7 @@ | |
|
||
from dask.distributed import Client | ||
from collections import Counter | ||
from collections.abc import Iterable | ||
|
||
from .analysis.base import AnalysisFunction | ||
from .analysis.feature_extractor import BaseLightCurveFeature, FeatureExtractor | ||
|
@@ -801,7 +802,9 @@ def calc_nobs(self, by_band=False, label="nobs", temporary=True): | |
band_counts["total"] = band_counts[list(band_counts.columns)].sum(axis=1) | ||
|
||
bands = band_counts.columns.values | ||
self.object = self.object.assign(**{label + "_" + str(band): band_counts[band] for band in bands}) | ||
self.object.assign( | ||
**{label + "_" + str(band): band_counts[band] for band in bands} | ||
).update_ensemble() | ||
|
||
if temporary: | ||
self._object_temp.extend(label + "_" + str(band) for band in bands) | ||
|
@@ -824,7 +827,7 @@ def calc_nobs(self, by_band=False, label="nobs", temporary=True): | |
.repartition(npartitions=self.object.npartitions) | ||
) | ||
|
||
self.object = self.object.assign(**{label + "_total": counts[self._band_col]}) | ||
self.object.assign(**{label + "_total": counts[self._band_col]}).update_ensemble() | ||
|
||
if temporary: | ||
self._object_temp.extend([label + "_total"]) | ||
|
@@ -1237,6 +1240,165 @@ def _standardize_batch(self, batch, on, by_band): | |
|
||
return batch | ||
|
||
def save_ensemble(self, path=".", dirname="ensemble", additional_frames=True, **kwargs): | ||
"""Save the current ensemble frames to disk. | ||
|
||
Parameters | ||
---------- | ||
path: 'str' or path-like, optional | ||
A path to the desired location of the top-level save directory, by | ||
default this is the current working directory. | ||
dirname: 'str', optional | ||
The name of the saved ensemble directory, "ensemble" by default. | ||
additional_frames: bool, or list, optional | ||
Controls whether EnsembleFrames beyond the Object and Source Frames | ||
are saved to disk. If True or False, this specifies whether all or | ||
none of the additional frames are saved. Alternatively, a list of | ||
EnsembleFrame names may be provided to specify which frames should | ||
be saved. Object and Source will always be added and do not need to | ||
be specified in the list. By default, all frames will be saved. | ||
**kwargs: | ||
Additional kwargs passed along to EnsembleFrame.to_parquet() | ||
|
||
Returns | ||
---------- | ||
None | ||
|
||
Note | ||
---- | ||
If the object frame has no columns, which is often the case when an | ||
Ensemble is constructed using only source files/dictionaries, then an | ||
object subdirectory will not be created. `Ensemble.from_ensemble` will | ||
know how to work with the directory whether or not the object | ||
subdirectory is present. | ||
|
||
Be careful about repeated saves to the same directory name. This will | ||
not be a perfect overwrite, as any products produced by a previous save | ||
may not be deleted by successive saves if they are removed from the | ||
ensemble. For best results, delete the directory between saves or | ||
verify that the contents are what you would expect. | ||
""" | ||
|
||
self._lazy_sync_tables("all") | ||
|
||
# Determine the path | ||
ens_path = os.path.join(path, dirname) | ||
|
||
# Compile frame list | ||
if additional_frames is True: | ||
frames_to_save = list(self.frames.keys()) # save all frames | ||
elif additional_frames is False: | ||
frames_to_save = ["object", "source"] # save just object and source | ||
elif isinstance(additional_frames, Iterable): | ||
frames_to_save = [frame for frame in additional_frames if frame in list(self.frames.keys())] | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nit: we can use sets here and give the user a more helpful error message
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Implemented in latest commit |
||
|
||
# Raise an error if any frames were not found in the frame list | ||
if len(frames_to_save) != len(additional_frames): | ||
raise ValueError( | ||
"One or more frames specified in `additional_frames` was not found in the frame list." | ||
) | ||
|
||
# Make sure object and source are in the frame list | ||
if "object" not in frames_to_save: | ||
frames_to_save.append("object") | ||
if "source" not in frames_to_save: | ||
frames_to_save.append("source") | ||
else: | ||
raise ValueError("Invalid input to `additional_frames`, must be boolean or list-like") | ||
|
||
# Save the frame list to disk | ||
for frame_label in frames_to_save: | ||
# grab the dataframe from the frame label | ||
frame = self.frames[frame_label] | ||
|
||
# Object can have no columns, which parquet doesn't handle | ||
# In this case, we'll avoid saving to parquet | ||
if frame_label == "object": | ||
if len(frame.columns) == 0: | ||
print("The Object Frame was not saved as no columns were present.") | ||
continue | ||
|
||
# creates a subdirectory for the frame partition files | ||
frame.to_parquet(os.path.join(ens_path, frame_label), **kwargs) | ||
|
||
# Save a ColumnMapper file | ||
col_map = self.make_column_map() | ||
np.save(os.path.join(ens_path, "column_mapper.npy"), col_map.map) | ||
|
||
print(f"Saved to {os.path.join(path, dirname)}") | ||
|
||
return | ||
|
||
def from_ensemble(self, dirpath, additional_frames=True, column_mapper=None, **kwargs): | ||
"""Load an ensemble from an on-disk ensemble. | ||
|
||
Parameters | ||
---------- | ||
dirpath: 'str' or path-like, optional | ||
A path to the top-level ensemble directory to load from. | ||
additional_frames: bool, or list, optional | ||
Controls whether EnsembleFrames beyond the Object and Source Frames | ||
are loaded from disk. If True or False, this specifies whether all | ||
or none of the additional frames are loaded. Alternatively, a list | ||
of EnsembleFrame names may be provided to specify which frames | ||
should be loaded. Object and Source will always be added and do not | ||
need to be specified in the list. By default, all frames will be | ||
loaded. | ||
column_mapper: Tape.ColumnMapper object, or None, optional | ||
Supplies a ColumnMapper to the Ensemble, if None (default) searches | ||
for a column_mapper.npy file in the directory, which should be | ||
created when the ensemble is saved. | ||
|
||
Returns | ||
---------- | ||
ensemble: `tape.ensemble.Ensemble` | ||
The ensemble object. | ||
""" | ||
|
||
# First grab the column_mapper if not specified | ||
if column_mapper is None: | ||
map_dict = np.load(os.path.join(dirpath, "column_mapper.npy"), allow_pickle="TRUE").item() | ||
column_mapper = ColumnMapper() | ||
column_mapper.map = map_dict | ||
|
||
# Load Object and Source | ||
obj_path = os.path.join(dirpath, "object") | ||
src_path = os.path.join(dirpath, "source") | ||
|
||
# Check for whether or not object is present, it's not saved when no columns are present | ||
if "object" in os.listdir(dirpath): | ||
self.from_parquet(src_path, obj_path, column_mapper=column_mapper, **kwargs) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should we always use Also as a general question that may be out of scope for this PR, are we preserving divisions here? If so I guess it would be helpful to use There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I believe we can always have For divisions, these are not being preserved at the moment, which definitely seems like an issue. We can save an ensemble without divisions being known, so the sorted parameter will need to be dependent on that information. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I just pushed another commit to better expose the sort/sorted kwargs from ensemble.from_parquet so they are usable in these cases. Some kind of metadata may be preferable here still so the user doesn't need to specify at all. Edit: No longer accurate, refer to next comment There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Another update, I now have this working with metadata. ensemble.save_ensemble now stores a boolean per frame in the metadata.json file indicating whether that frame had divisions set. Instead of using the sort/sorted kwargs, I've changed ensemble.from_ensemble to use this to determine whether to calculate divisions or not. In the case of object and source, this is just by setting the sorted flag or not. For the other frames, the sorted flag is only applicable to the set_index call that ensemble.from_parquet folds into. So this PR now has the frames generate a parquet metadata file which populates divisions information if it's available, and the reading of additional frames now uses the parquet metadata file. I experimented with trying to get our ensemble.from_parquet to use these _metadata files, but it would be a non-trivial change in the logic that felt out of scope for this PR. It's potentially worth logging for the future as using the _metadata files would avoid having to calculate the min/max values per partition to get divisions, but on the other hand these files can get quite large for large datasets so they may have issues at scale. |
||
else: | ||
self.from_parquet(src_path, column_mapper=column_mapper, **kwargs) | ||
|
||
# Load all remaining frames | ||
if additional_frames is False: | ||
return self # we are all done | ||
else: | ||
if additional_frames is True: | ||
# Grab all subdirectory paths in the top-level folder, filter out any files | ||
frames_to_load = [ | ||
os.path.join(dirpath, f) | ||
for f in os.listdir(dirpath) | ||
if not os.path.isfile(os.path.join(dirpath, f)) | ||
] | ||
elif isinstance(additional_frames, Iterable): | ||
frames_to_load = [os.path.join(dirpath, frame) for frame in additional_frames] | ||
else: | ||
raise ValueError("Invalid input to `additional_frames`, must be boolean or list-like") | ||
|
||
# Filter out object and source from additional frames | ||
frames_to_load = [ | ||
frame for frame in frames_to_load if os.path.split(frame)[1] not in ["object", "source"] | ||
] | ||
if len(frames_to_load) > 0: | ||
for frame in frames_to_load: | ||
label = os.path.split(frame)[1] | ||
ddf = EnsembleFrame.from_parquet(frame, label=label, **kwargs) | ||
self.add_frame(ddf, label) | ||
|
||
return self | ||
|
||
def from_pandas( | ||
self, | ||
source_frame, | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Given how liberally jupyter users may hit "run all cells", I think this could lead to some easy mistakes.
What about having an "overwrite" parameter?
If True, we will first try to delete the save directory (possibly printing a message if we did so)
If False, we can throw an exception if the save directory already exists noting they should use a different name or set "overwrite=True"
My instinct is that the default value of the parameter should be False.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, so we have access to overwrite already via the **kwargs passed along to all the to_parquet calls. What remains is that that will only overwrite any subdirectories (ensemble/object, ensemble/source, ensemble/result, etc.) that were present in both saves. So any previous subdirectories will not be deleted if a new save doesn't include them. I'm really worried about doing any form of delete, just as someone could specify a non-empty directory and I wouldn't want to delete anything else.
So for 1. Maybe the overwrite parameter should graduate out of the **kwargs for visibility? And 2. I'm not sure but maybe we could have save write some kind of subdirs metadata file that lets TAPE know which subdirectories it made, so that overwrite can clean those directories up in subsequent overwrite-enabled saves?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wilson and I talked offline a bit on this. The strategy that we agreed on is to generate a metadata file that knows which subdirectories were produced by the save command. Successive save commands will look for this file and use it to locate which subdirectories in the directory should be removed during the save operation. In particular, we'd be looking for any subdirectories created by a previous save command that will not be overwritten by the current save command. Parquet will handle overwrites for conflicting directories, but we will need ensemble.save_ensemble to catch and remove these frame subdirectories that would otherwise not be touched.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The latest commit implements this, with the tweak that I ended up opting to just clean all the previous subdirectories up first as it was logically easier.