Skip to content

Commit

Permalink
remove provenance machinery for now
Browse files Browse the repository at this point in the history
  • Loading branch information
dougbrn committed Dec 11, 2023
1 parent 1e8abff commit 02e1c73
Show file tree
Hide file tree
Showing 5 changed files with 4 additions and 59 deletions.
25 changes: 0 additions & 25 deletions src/tape/ensemble.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,6 @@ def __init__(self, client=True, **kwargs):
self._flux_col = None
self._err_col = None
self._band_col = None
self._provenance_col = None

self.client = None
self.cleanup_client = False
Expand Down Expand Up @@ -256,7 +255,6 @@ def insert_sources(
timestamps,
fluxes,
flux_errs=None,
provenance_label="custom",
force_repartition=False,
**kwargs,
):
Expand All @@ -282,8 +280,6 @@ def insert_sources(
A list of the fluxes of the observations.
flux_errs: `list`, optional
A list of the errors in the flux.
provenance_label: `str`, optional
A label that denotes the provenance of the new observations.
force_repartition: `bool` optional
Do an immediate repartition of the dataframes.
"""
Expand All @@ -302,16 +298,12 @@ def insert_sources(
f"Incorrect flux_errs length during insert" f"{num_inserting} != {len(flux_errs)}"
)

# Construct provenance array
provenance = [provenance_label] * len(obj_ids)

# Create a dictionary with the new information.
rows = {
self._id_col: obj_ids,
self._band_col: bands,
self._time_col: timestamps,
self._flux_col: fluxes,
self._provenance_col: provenance,
}
if flux_errs is not None:
rows[self._err_col] = flux_errs
Expand Down Expand Up @@ -1350,7 +1342,6 @@ def make_column_map(self):
flux_col=self._flux_col,
err_col=self._err_col,
band_col=self._band_col,
provenance_col=self._provenance_col,
)
return result

Expand Down Expand Up @@ -1408,11 +1399,6 @@ def _load_column_mapper(self, column_mapper, **kwargs):
self._flux_col = column_mapper.map["flux_col"]
self._err_col = column_mapper.map["err_col"]
self._band_col = column_mapper.map["band_col"]

# Assign optional columns if provided
if column_mapper.map["provenance_col"] is not None:
self._provenance_col = column_mapper.map["provenance_col"]

else:
raise ValueError(f"Missing required column mapping information: {needed}")

Expand All @@ -1423,7 +1409,6 @@ def from_parquet(
source_file,
object_file=None,
column_mapper=None,
provenance_label="survey_1",
sync_tables=True,
additional_cols=True,
npartitions=None,
Expand All @@ -1446,8 +1431,6 @@ def from_parquet(
column_mapper: 'ColumnMapper' object
If provided, the ColumnMapper is used to populate relevant column
information mapped from the input dataset.
provenance_label: 'str', optional
Determines the label to use if a provenance column is generated
sync_tables: 'bool', optional
In the case where object files are loaded in, determines whether an
initial sync is performed between the object and source tables. If
Expand Down Expand Up @@ -1484,20 +1467,13 @@ def from_parquet(
columns = None # None will prompt read_parquet to read in all cols
else:
columns = [self._time_col, self._flux_col, self._err_col, self._band_col]
if self._provenance_col is not None:
columns.append(self._provenance_col)

# Read in the source parquet file(s)
# Index is set False so that we can set it with a future set_index call
# This has the advantage of letting Dask set partition boundaries based
# on the divisions between the sources of different objects.
source = SourceFrame.from_parquet(source_file, index=False, columns=columns, ensemble=self)

# Generate a provenance column if not provided
if self._provenance_col is None:
source["provenance"] = provenance_label
self._provenance_col = "provenance"

object = None
if object_file:
# Read in the object file(s)
Expand Down Expand Up @@ -1550,7 +1526,6 @@ def from_dataset(self, dataset, **kwargs):
source_file=dataset_info["source_file"],
object_file=dataset_info["object_file"],
column_mapper=col_map,
provenance_label=dataset,
**kwargs,
)

Expand Down
5 changes: 0 additions & 5 deletions src/tape/ensemble_readers.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,6 @@ def read_parquet(
object_file=None,
column_mapper=None,
dask_client=True,
provenance_label="survey_1",
sync_tables=True,
additional_cols=True,
npartitions=None,
Expand All @@ -158,8 +157,6 @@ def read_parquet(
`client=True`, passing any additional kwargs to a
dask.distributed.Client constructor call. If `client=False`, the
Ensemble is created without a distributed client.
provenance_label: 'str', optional
Determines the label to use if a provenance column is generated
sync_tables: 'bool', optional
In the case where object files are loaded in, determines whether an
initial sync is performed between the object and source tables. If
Expand Down Expand Up @@ -188,7 +185,6 @@ def read_parquet(
source_file=source_file,
object_file=object_file,
column_mapper=column_mapper,
provenance_label=provenance_label,
sync_tables=sync_tables,
additional_cols=additional_cols,
npartitions=npartitions,
Expand Down Expand Up @@ -322,7 +318,6 @@ def read_dataset(dataset, dask_client=True, **kwargs):
source_file=dataset_info["source_file"],
object_file=dataset_info["object_file"],
column_mapper=col_map,
provenance_label=dataset,
dask_client=dask_client,
**kwargs,
)
16 changes: 2 additions & 14 deletions src/tape/utils/column_mapper/column_mapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ def __init__(
flux_col=None,
err_col=None,
band_col=None,
provenance_col=None,
):
"""
Expand All @@ -27,9 +26,6 @@ def __init__(
Identifies which column contains the flux/mag error information
band_col: 'str', optional
Identifies which column contains the band information
provenance_col: 'str', optional
Identifies which column contains the provenance information, if
None the provenance column is generated.
Returns
-------
Expand All @@ -44,7 +40,6 @@ def __init__(
"flux_col": flux_col,
"err_col": err_col,
"band_col": band_col,
"provenance_col": provenance_col,
}

self.required = [
Expand All @@ -53,7 +48,6 @@ def __init__(
Column("flux_col", True),
Column("err_col", True),
Column("band_col", True),
Column("provenance_col", False),
]

self.known_maps = {"ZTF": ZTFColumnMapper}
Expand All @@ -76,8 +70,8 @@ def use_known_map(self, map_id):
Returns
-------
A ColumnMapper subclass object dependent on the map_id provided, for example
ZTFColumnMapper in the case of "ZTF"
A ColumnMapper subclass object dependent on the map_id provided,
ZTFColumnMapper in the case of "ZTF" for example
"""
if map_id in self.known_maps:
Expand Down Expand Up @@ -122,7 +116,6 @@ def assign(
flux_col=None,
err_col=None,
band_col=None,
provenance_col=None,
):
"""Updates a given set of columns
Expand All @@ -144,17 +137,13 @@ def assign(
nobs_tot_col: 'str', optional
Identifies which column contains the total number of observations,
if available in the input object file
provenance_col: 'str', optional
Identifies which column contains the provenance information, if
None the provenance column is generated.
"""
assign_map = {
"id_col": id_col,
"time_col": time_col,
"flux_col": flux_col,
"err_col": err_col,
"band_col": band_col,
"provenance_col": provenance_col,
}

for item in assign_map.items():
Expand All @@ -175,7 +164,6 @@ def _set_known_map(self):
"flux_col": "psFlux",
"err_col": "psFluxErr",
"band_col": "filterName",
"provenance_col": None,
}
return self

Expand Down
11 changes: 2 additions & 9 deletions tests/tape_tests/test_ensemble.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,6 @@ def test_parquet_construction(data_fixture, request):
parquet_ensemble._flux_col,
parquet_ensemble._err_col,
parquet_ensemble._band_col,
parquet_ensemble._provenance_col,
]:
# Check to make sure the critical quantity labels are bound to real columns
assert parquet_ensemble.source[col] is not None
Expand Down Expand Up @@ -488,7 +487,6 @@ def test_insert(parquet_ensemble):
flux_col = parquet_ensemble._flux_col
err_col = parquet_ensemble._err_col
band_col = parquet_ensemble._band_col
prov_col = parquet_ensemble._provenance_col

# Test an insertion of 5 observations.
new_inds = [2, 1, 100, 110, 111]
Expand All @@ -512,7 +510,6 @@ def test_insert(parquet_ensemble):
assert new_source.loc[new_inds[i]][flux_col] == new_fluxes[i]
assert new_source.loc[new_inds[i]][err_col] == new_errs[i]
assert new_source.loc[new_inds[i]][band_col] == new_bands[i]
assert new_source.loc[new_inds[i]][prov_col] == "custom"

# Check that all of the old data is still in there.
obj_ids = old_source.index.unique()
Expand Down Expand Up @@ -553,7 +550,6 @@ def test_insert_paritioned(dask_client):
flux_col="flux",
err_col="err",
band_col="band",
provenance_col="provenance",
)
ens.from_source_dict(rows, column_mapper=cmap, npartitions=4, sort=True)

Expand Down Expand Up @@ -725,14 +721,12 @@ def test_update_column_map(dask_client):
assert cmap_1.map["flux_col"] == "flux"
assert cmap_1.map["err_col"] == "err"
assert cmap_1.map["band_col"] == "band"
assert cmap_1.map["provenance_col"] is None

# Update the column map.
ens.update_column_mapping(flux_col="f2", provenance_col="p")
ens.update_column_mapping(flux_col="f2")

# Check that the flux and provenance columns have been updates.
# Check that the flux column has been updated.
assert ens._flux_col == "f2"
assert ens._provenance_col == "p"

# Check that we retrieve the updated column map.
cmap_2 = ens.make_column_map()
Expand All @@ -741,7 +735,6 @@ def test_update_column_map(dask_client):
assert cmap_2.map["flux_col"] == "f2"
assert cmap_2.map["err_col"] == "err"
assert cmap_2.map["band_col"] == "band"
assert cmap_2.map["provenance_col"] == "p"


@pytest.mark.parametrize(
Expand Down
6 changes: 0 additions & 6 deletions tests/tape_tests/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,12 @@ def test_column_mapper():

assert col_map.is_ready() # col_map should now be ready

# Assign the remaining columns
col_map.assign(provenance_col="provenance")

expected_map = {
"id_col": "id",
"time_col": "time",
"flux_col": "flux",
"err_col": "err",
"band_col": "band",
"provenance_col": "provenance",
}

assert col_map.map == expected_map # The expected mapping
Expand All @@ -48,7 +44,6 @@ def test_column_mapper_init():
flux_col="flux",
err_col="err",
band_col="band",
provenance_col="provenance",
)

assert col_map.is_ready() # col_map should be ready
Expand All @@ -59,7 +54,6 @@ def test_column_mapper_init():
"flux_col": "flux",
"err_col": "err",
"band_col": "band",
"provenance_col": "provenance",
}

assert col_map.map == expected_map # The expected mapping
Expand Down

0 comments on commit 02e1c73

Please sign in to comment.