Skip to content

Commit

Permalink
Fix table syncing to use inner joins.
Browse files Browse the repository at this point in the history
  • Loading branch information
wilsonbb committed Nov 30, 2023
1 parent 6f85220 commit 33ca427
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 9 deletions.
9 changes: 5 additions & 4 deletions src/tape/ensemble.py
Original file line number Diff line number Diff line change
Expand Up @@ -1550,8 +1550,8 @@ def _sync_tables(self):
# Lazily Create an empty object table (just index) for joining
empty_obj = self._object.map_partitions(lambda x: pd.DataFrame(index=x.index))

# Join source onto the empty object table to align
self._source = empty_obj.join(self._source)
# Join source onto the empty object table to remove IDs not present in both tables
self._source = self._source.join(empty_obj, how='inner')
else:
warnings.warn("Divisions are not known, syncing using a non-lazy method.")
obj_idx = list(self._object.index.compute())
Expand All @@ -1570,8 +1570,9 @@ def _sync_tables(self):
# Lazily Create an empty source table (just unique indexes) for joining
empty_src = self._source.map_partitions(lambda x: pd.DataFrame(index=x.index.unique()))

# Join object onto the empty unique source table to align
self._object = empty_src.join(self._object)
# Join object onto the empty unique source table to remove IDs not present in
# both tables
self._object = self._object.join(empty_src, how='inner')

else:
warnings.warn("Divisions are not known, syncing using a non-lazy method.")
Expand Down
23 changes: 18 additions & 5 deletions tests/tape_tests/test_ensemble.py
Original file line number Diff line number Diff line change
Expand Up @@ -595,16 +595,29 @@ def test_sync_tables(data_fixture, request):
parquet_ensemble.dropna(table="source")
assert parquet_ensemble._source_dirty # Dropna should set the source dirty flag

# Drop a whole object to test that the object is dropped in the object table
parquet_ensemble.query(f"{parquet_ensemble._id_col} != 88472935274829959", table="source")
# Drop a whole object from Source to test that the object is dropped in the object table
dropped_obj_id = 88472935274829959
parquet_ensemble.query(f"{parquet_ensemble._id_col} != {dropped_obj_id}", table="source")

parquet_ensemble._sync_tables()
# Marks the Object table as dirty without triggering a sync. This is good to test since
# we always sync the object table first.
parquet_ensemble.dropna("object")

# Verify that the object ID we removed from the source table is present in the object table
assert dropped_obj_id in parquet_ensemble._object.index.compute().values

# both tables should have the expected number of rows after a sync
# Perform an operation which should trigger syncing both tables.
parquet_ensemble.compute()

# Both tables should have the expected number of rows after a sync
assert len(parquet_ensemble.compute("object")) == 4
assert len(parquet_ensemble.compute("source")) == 1063

# dirty flags should be unset after sync
# Validate that the filtered object has been removed from both tables.
assert dropped_obj_id not in parquet_ensemble._source.index.compute().values
assert dropped_obj_id not in parquet_ensemble._object.index.compute().values

# Dirty flags should be unset after sync
assert not parquet_ensemble._object_dirty
assert not parquet_ensemble._source_dirty

Expand Down

0 comments on commit 33ca427

Please sign in to comment.