From 02e1c73a1dd28e3be91aa6d388d83494917e8f5e Mon Sep 17 00:00:00 2001 From: Doug Branton Date: Mon, 11 Dec 2023 11:57:22 -0800 Subject: [PATCH] remove provenance machinery for now --- src/tape/ensemble.py | 25 ------------------- src/tape/ensemble_readers.py | 5 ---- src/tape/utils/column_mapper/column_mapper.py | 16 ++---------- tests/tape_tests/test_ensemble.py | 11 ++------ tests/tape_tests/test_utils.py | 6 ----- 5 files changed, 4 insertions(+), 59 deletions(-) diff --git a/src/tape/ensemble.py b/src/tape/ensemble.py index 5d654232..3c2bb2c6 100644 --- a/src/tape/ensemble.py +++ b/src/tape/ensemble.py @@ -69,7 +69,6 @@ def __init__(self, client=True, **kwargs): self._flux_col = None self._err_col = None self._band_col = None - self._provenance_col = None self.client = None self.cleanup_client = False @@ -256,7 +255,6 @@ def insert_sources( timestamps, fluxes, flux_errs=None, - provenance_label="custom", force_repartition=False, **kwargs, ): @@ -282,8 +280,6 @@ def insert_sources( A list of the fluxes of the observations. flux_errs: `list`, optional A list of the errors in the flux. - provenance_label: `str`, optional - A label that denotes the provenance of the new observations. force_repartition: `bool` optional Do an immediate repartition of the dataframes. """ @@ -302,16 +298,12 @@ def insert_sources( f"Incorrect flux_errs length during insert" f"{num_inserting} != {len(flux_errs)}" ) - # Construct provenance array - provenance = [provenance_label] * len(obj_ids) - # Create a dictionary with the new information. rows = { self._id_col: obj_ids, self._band_col: bands, self._time_col: timestamps, self._flux_col: fluxes, - self._provenance_col: provenance, } if flux_errs is not None: rows[self._err_col] = flux_errs @@ -1350,7 +1342,6 @@ def make_column_map(self): flux_col=self._flux_col, err_col=self._err_col, band_col=self._band_col, - provenance_col=self._provenance_col, ) return result @@ -1408,11 +1399,6 @@ def _load_column_mapper(self, column_mapper, **kwargs): self._flux_col = column_mapper.map["flux_col"] self._err_col = column_mapper.map["err_col"] self._band_col = column_mapper.map["band_col"] - - # Assign optional columns if provided - if column_mapper.map["provenance_col"] is not None: - self._provenance_col = column_mapper.map["provenance_col"] - else: raise ValueError(f"Missing required column mapping information: {needed}") @@ -1423,7 +1409,6 @@ def from_parquet( source_file, object_file=None, column_mapper=None, - provenance_label="survey_1", sync_tables=True, additional_cols=True, npartitions=None, @@ -1446,8 +1431,6 @@ def from_parquet( column_mapper: 'ColumnMapper' object If provided, the ColumnMapper is used to populate relevant column information mapped from the input dataset. - provenance_label: 'str', optional - Determines the label to use if a provenance column is generated sync_tables: 'bool', optional In the case where object files are loaded in, determines whether an initial sync is performed between the object and source tables. If @@ -1484,8 +1467,6 @@ def from_parquet( columns = None # None will prompt read_parquet to read in all cols else: columns = [self._time_col, self._flux_col, self._err_col, self._band_col] - if self._provenance_col is not None: - columns.append(self._provenance_col) # Read in the source parquet file(s) # Index is set False so that we can set it with a future set_index call @@ -1493,11 +1474,6 @@ def from_parquet( # on the divisions between the sources of different objects. source = SourceFrame.from_parquet(source_file, index=False, columns=columns, ensemble=self) - # Generate a provenance column if not provided - if self._provenance_col is None: - source["provenance"] = provenance_label - self._provenance_col = "provenance" - object = None if object_file: # Read in the object file(s) @@ -1550,7 +1526,6 @@ def from_dataset(self, dataset, **kwargs): source_file=dataset_info["source_file"], object_file=dataset_info["object_file"], column_mapper=col_map, - provenance_label=dataset, **kwargs, ) diff --git a/src/tape/ensemble_readers.py b/src/tape/ensemble_readers.py index 119bb206..9b636024 100644 --- a/src/tape/ensemble_readers.py +++ b/src/tape/ensemble_readers.py @@ -132,7 +132,6 @@ def read_parquet( object_file=None, column_mapper=None, dask_client=True, - provenance_label="survey_1", sync_tables=True, additional_cols=True, npartitions=None, @@ -158,8 +157,6 @@ def read_parquet( `client=True`, passing any additional kwargs to a dask.distributed.Client constructor call. If `client=False`, the Ensemble is created without a distributed client. - provenance_label: 'str', optional - Determines the label to use if a provenance column is generated sync_tables: 'bool', optional In the case where object files are loaded in, determines whether an initial sync is performed between the object and source tables. If @@ -188,7 +185,6 @@ def read_parquet( source_file=source_file, object_file=object_file, column_mapper=column_mapper, - provenance_label=provenance_label, sync_tables=sync_tables, additional_cols=additional_cols, npartitions=npartitions, @@ -322,7 +318,6 @@ def read_dataset(dataset, dask_client=True, **kwargs): source_file=dataset_info["source_file"], object_file=dataset_info["object_file"], column_mapper=col_map, - provenance_label=dataset, dask_client=dask_client, **kwargs, ) diff --git a/src/tape/utils/column_mapper/column_mapper.py b/src/tape/utils/column_mapper/column_mapper.py index 48d3ee6e..c43e622d 100644 --- a/src/tape/utils/column_mapper/column_mapper.py +++ b/src/tape/utils/column_mapper/column_mapper.py @@ -11,7 +11,6 @@ def __init__( flux_col=None, err_col=None, band_col=None, - provenance_col=None, ): """ @@ -27,9 +26,6 @@ def __init__( Identifies which column contains the flux/mag error information band_col: 'str', optional Identifies which column contains the band information - provenance_col: 'str', optional - Identifies which column contains the provenance information, if - None the provenance column is generated. Returns ------- @@ -44,7 +40,6 @@ def __init__( "flux_col": flux_col, "err_col": err_col, "band_col": band_col, - "provenance_col": provenance_col, } self.required = [ @@ -53,7 +48,6 @@ def __init__( Column("flux_col", True), Column("err_col", True), Column("band_col", True), - Column("provenance_col", False), ] self.known_maps = {"ZTF": ZTFColumnMapper} @@ -76,8 +70,8 @@ def use_known_map(self, map_id): Returns ------- - A ColumnMapper subclass object dependent on the map_id provided, for example - ZTFColumnMapper in the case of "ZTF" + A ColumnMapper subclass object dependent on the map_id provided, + ZTFColumnMapper in the case of "ZTF" for example """ if map_id in self.known_maps: @@ -122,7 +116,6 @@ def assign( flux_col=None, err_col=None, band_col=None, - provenance_col=None, ): """Updates a given set of columns @@ -144,9 +137,6 @@ def assign( nobs_tot_col: 'str', optional Identifies which column contains the total number of observations, if available in the input object file - provenance_col: 'str', optional - Identifies which column contains the provenance information, if - None the provenance column is generated. """ assign_map = { "id_col": id_col, @@ -154,7 +144,6 @@ def assign( "flux_col": flux_col, "err_col": err_col, "band_col": band_col, - "provenance_col": provenance_col, } for item in assign_map.items(): @@ -175,7 +164,6 @@ def _set_known_map(self): "flux_col": "psFlux", "err_col": "psFluxErr", "band_col": "filterName", - "provenance_col": None, } return self diff --git a/tests/tape_tests/test_ensemble.py b/tests/tape_tests/test_ensemble.py index 40a0264a..4e580fdc 100644 --- a/tests/tape_tests/test_ensemble.py +++ b/tests/tape_tests/test_ensemble.py @@ -91,7 +91,6 @@ def test_parquet_construction(data_fixture, request): parquet_ensemble._flux_col, parquet_ensemble._err_col, parquet_ensemble._band_col, - parquet_ensemble._provenance_col, ]: # Check to make sure the critical quantity labels are bound to real columns assert parquet_ensemble.source[col] is not None @@ -488,7 +487,6 @@ def test_insert(parquet_ensemble): flux_col = parquet_ensemble._flux_col err_col = parquet_ensemble._err_col band_col = parquet_ensemble._band_col - prov_col = parquet_ensemble._provenance_col # Test an insertion of 5 observations. new_inds = [2, 1, 100, 110, 111] @@ -512,7 +510,6 @@ def test_insert(parquet_ensemble): assert new_source.loc[new_inds[i]][flux_col] == new_fluxes[i] assert new_source.loc[new_inds[i]][err_col] == new_errs[i] assert new_source.loc[new_inds[i]][band_col] == new_bands[i] - assert new_source.loc[new_inds[i]][prov_col] == "custom" # Check that all of the old data is still in there. obj_ids = old_source.index.unique() @@ -553,7 +550,6 @@ def test_insert_paritioned(dask_client): flux_col="flux", err_col="err", band_col="band", - provenance_col="provenance", ) ens.from_source_dict(rows, column_mapper=cmap, npartitions=4, sort=True) @@ -725,14 +721,12 @@ def test_update_column_map(dask_client): assert cmap_1.map["flux_col"] == "flux" assert cmap_1.map["err_col"] == "err" assert cmap_1.map["band_col"] == "band" - assert cmap_1.map["provenance_col"] is None # Update the column map. - ens.update_column_mapping(flux_col="f2", provenance_col="p") + ens.update_column_mapping(flux_col="f2") - # Check that the flux and provenance columns have been updates. + # Check that the flux column has been updated. assert ens._flux_col == "f2" - assert ens._provenance_col == "p" # Check that we retrieve the updated column map. cmap_2 = ens.make_column_map() @@ -741,7 +735,6 @@ def test_update_column_map(dask_client): assert cmap_2.map["flux_col"] == "f2" assert cmap_2.map["err_col"] == "err" assert cmap_2.map["band_col"] == "band" - assert cmap_2.map["provenance_col"] == "p" @pytest.mark.parametrize( diff --git a/tests/tape_tests/test_utils.py b/tests/tape_tests/test_utils.py index 0a75aff8..124e3ab2 100644 --- a/tests/tape_tests/test_utils.py +++ b/tests/tape_tests/test_utils.py @@ -22,16 +22,12 @@ def test_column_mapper(): assert col_map.is_ready() # col_map should now be ready - # Assign the remaining columns - col_map.assign(provenance_col="provenance") - expected_map = { "id_col": "id", "time_col": "time", "flux_col": "flux", "err_col": "err", "band_col": "band", - "provenance_col": "provenance", } assert col_map.map == expected_map # The expected mapping @@ -48,7 +44,6 @@ def test_column_mapper_init(): flux_col="flux", err_col="err", band_col="band", - provenance_col="provenance", ) assert col_map.is_ready() # col_map should be ready @@ -59,7 +54,6 @@ def test_column_mapper_init(): "flux_col": "flux", "err_col": "err", "band_col": "band", - "provenance_col": "provenance", } assert col_map.map == expected_map # The expected mapping