Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix table syncing to use inner joins. #303

Merged
merged 2 commits into from
Dec 1, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions src/tape/ensemble.py
Original file line number Diff line number Diff line change
Expand Up @@ -1550,8 +1550,8 @@ def _sync_tables(self):
# Lazily Create an empty object table (just index) for joining
empty_obj = self._object.map_partitions(lambda x: pd.DataFrame(index=x.index))

# Join source onto the empty object table to align
self._source = empty_obj.join(self._source)
# Join source onto the empty object table to remove IDs not present in both tables
self._source = self._source.join(empty_obj, how="inner")
else:
warnings.warn("Divisions are not known, syncing using a non-lazy method.")
obj_idx = list(self._object.index.compute())
Expand All @@ -1570,8 +1570,9 @@ def _sync_tables(self):
# Lazily Create an empty source table (just unique indexes) for joining
empty_src = self._source.map_partitions(lambda x: pd.DataFrame(index=x.index.unique()))

# Join object onto the empty unique source table to align
self._object = empty_src.join(self._object)
# Join object onto the empty unique source table to remove IDs not present in
# both tables
self._object = self._object.join(empty_src, how="inner")

else:
warnings.warn("Divisions are not known, syncing using a non-lazy method.")
Expand Down
23 changes: 18 additions & 5 deletions tests/tape_tests/test_ensemble.py
Original file line number Diff line number Diff line change
Expand Up @@ -595,16 +595,29 @@ def test_sync_tables(data_fixture, request):
parquet_ensemble.dropna(table="source")
assert parquet_ensemble._source_dirty # Dropna should set the source dirty flag

# Drop a whole object to test that the object is dropped in the object table
parquet_ensemble.query(f"{parquet_ensemble._id_col} != 88472935274829959", table="source")
# Drop a whole object from Source to test that the object is dropped in the object table
dropped_obj_id = 88472935274829959
parquet_ensemble.query(f"{parquet_ensemble._id_col} != {dropped_obj_id}", table="source")

parquet_ensemble._sync_tables()
# Marks the Object table as dirty without triggering a sync. This is good to test since
# we always sync the object table first.
parquet_ensemble.dropna("object")

# Verify that the object ID we removed from the source table is present in the object table
assert dropped_obj_id in parquet_ensemble._object.index.compute().values

# both tables should have the expected number of rows after a sync
# Perform an operation which should trigger syncing both tables.
parquet_ensemble.compute()

# Both tables should have the expected number of rows after a sync
assert len(parquet_ensemble.compute("object")) == 4
assert len(parquet_ensemble.compute("source")) == 1063

# dirty flags should be unset after sync
# Validate that the filtered object has been removed from both tables.
assert dropped_obj_id not in parquet_ensemble._source.index.compute().values
assert dropped_obj_id not in parquet_ensemble._object.index.compute().values

# Dirty flags should be unset after sync
assert not parquet_ensemble._object_dirty
assert not parquet_ensemble._source_dirty

Expand Down