From 11da9ebc254b16104aba156583b3d7c1e10ea113 Mon Sep 17 00:00:00 2001 From: Chris Wenneman Date: Wed, 4 Oct 2023 22:02:56 -0700 Subject: [PATCH] Changed read_dask_dataframe to call from_ method --- src/tape/ensemble.py | 44 ++++++++++---------------------------------- 1 file changed, 10 insertions(+), 34 deletions(-) diff --git a/src/tape/ensemble.py b/src/tape/ensemble.py index f226b7fe..8cc7a45e 100644 --- a/src/tape/ensemble.py +++ b/src/tape/ensemble.py @@ -1709,40 +1709,16 @@ def read_dask_dataframe( dask_client = create_client new_ens = Ensemble(dask_client, **kwargs) - new_ens._load_column_mapper(column_mapper, **kwargs) - - # Set the index of the source frame and save the resulting table - new_ens._source = source_frame.set_index(new_ens._id_col, drop=True) - - if object_frame is None: # generate an indexed object table from source - new_ens._object = new_ens._generate_object_table() - new_ens._nobs_bands = [col for col in list(new_ens._object.columns) if col != new_ens._nobs_tot_col] - else: - new_ens._object = object_frame - if new_ens._nobs_band_cols is None: - # sets empty nobs cols in object - unq_filters = np.unique(new_ens._source[new_ens._band_col]) - new_ens._nobs_band_cols = [f"nobs_{filt}" for filt in unq_filters] - for col in new_ens._nobs_band_cols: - new_ens._object[col] = np.nan - - # Handle nobs_total column - if new_ens._nobs_tot_col is None: - new_ens._object["nobs_total"] = np.nan - new_ens._nobs_tot_col = "nobs_total" - - new_ens._object = new_ens._object.set_index(new_ens._id_col) - - # Optionally sync the tables, recalculates nobs columns - if sync_tables: - new_ens._source_dirty = True - new_ens._object_dirty = True - new_ens._sync_tables() - - if npartitions and npartitions > 1: - new_ens._source = new_ens._source.repartition(npartitions=npartitions) - elif partition_size: - new_ens._source = new_ens._source.repartition(partition_size=partition_size) + + new_ens.from_dask_dataframe( + source_frame=source_frame, + object_frame=object_frame, + column_mapper=column_mapper, + sync_tables=sync_tables, + npartitions=npartitions, + partition_size=partition_size, + **kwargs, + ) return new_ens