From 33ca4270860ac1ef1df3de98048a6f9b994ebc2a Mon Sep 17 00:00:00 2001 From: Wilson Beebe Date: Thu, 30 Nov 2023 15:49:05 -0800 Subject: [PATCH] Fix table syncing to use inner joins. --- src/tape/ensemble.py | 9 +++++---- tests/tape_tests/test_ensemble.py | 23 ++++++++++++++++++----- 2 files changed, 23 insertions(+), 9 deletions(-) diff --git a/src/tape/ensemble.py b/src/tape/ensemble.py index 35eb7ead..99d2fa82 100644 --- a/src/tape/ensemble.py +++ b/src/tape/ensemble.py @@ -1550,8 +1550,8 @@ def _sync_tables(self): # Lazily Create an empty object table (just index) for joining empty_obj = self._object.map_partitions(lambda x: pd.DataFrame(index=x.index)) - # Join source onto the empty object table to align - self._source = empty_obj.join(self._source) + # Join source onto the empty object table to remove IDs not present in both tables + self._source = self._source.join(empty_obj, how='inner') else: warnings.warn("Divisions are not known, syncing using a non-lazy method.") obj_idx = list(self._object.index.compute()) @@ -1570,8 +1570,9 @@ def _sync_tables(self): # Lazily Create an empty source table (just unique indexes) for joining empty_src = self._source.map_partitions(lambda x: pd.DataFrame(index=x.index.unique())) - # Join object onto the empty unique source table to align - self._object = empty_src.join(self._object) + # Join object onto the empty unique source table to remove IDs not present in + # both tables + self._object = self._object.join(empty_src, how='inner') else: warnings.warn("Divisions are not known, syncing using a non-lazy method.") diff --git a/tests/tape_tests/test_ensemble.py b/tests/tape_tests/test_ensemble.py index f4bb7df2..7c8a96ba 100644 --- a/tests/tape_tests/test_ensemble.py +++ b/tests/tape_tests/test_ensemble.py @@ -595,16 +595,29 @@ def test_sync_tables(data_fixture, request): parquet_ensemble.dropna(table="source") assert parquet_ensemble._source_dirty # Dropna should set the source dirty flag - # Drop a whole object to test that the object is dropped in the object table - parquet_ensemble.query(f"{parquet_ensemble._id_col} != 88472935274829959", table="source") + # Drop a whole object from Source to test that the object is dropped in the object table + dropped_obj_id = 88472935274829959 + parquet_ensemble.query(f"{parquet_ensemble._id_col} != {dropped_obj_id}", table="source") - parquet_ensemble._sync_tables() + # Marks the Object table as dirty without triggering a sync. This is good to test since + # we always sync the object table first. + parquet_ensemble.dropna("object") + + # Verify that the object ID we removed from the source table is present in the object table + assert dropped_obj_id in parquet_ensemble._object.index.compute().values - # both tables should have the expected number of rows after a sync + # Perform an operation which should trigger syncing both tables. + parquet_ensemble.compute() + + # Both tables should have the expected number of rows after a sync assert len(parquet_ensemble.compute("object")) == 4 assert len(parquet_ensemble.compute("source")) == 1063 - # dirty flags should be unset after sync + # Validate that the filtered object has been removed from both tables. + assert dropped_obj_id not in parquet_ensemble._source.index.compute().values + assert dropped_obj_id not in parquet_ensemble._object.index.compute().values + + # Dirty flags should be unset after sync assert not parquet_ensemble._object_dirty assert not parquet_ensemble._source_dirty